You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

232 lines
9.7 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import itsi_py3
from enum import Enum
from .itsi_service import ItsiService
from ITOA import itoa_common as utils
from ITOA.itoa_object import CRUDMethodTypes
from ITOA.setup_logging import logger
from itsi.objects.itsi_sandbox import ItsiSandbox, Status
SANDBOX_SERVICES_LIMIT = 250
class SandboxServiceStatus(int, Enum):
STATUS_SERVICE_SYNC_PENDING = 0
STATUS_SERVICE_SYNC_COMPLETED = 1
STATUS_SERVICE_SYNC_FAILED = 2
STATUS_SERVICE_SYNC_UNKNOWN = 3
@staticmethod
def pretty_string(status=STATUS_SERVICE_SYNC_UNKNOWN):
"""
Retrun pretty string of the Status.
@type status: Status
@param status: status of the sandbox
"""
status_string = "Unknown"
if status == Status.STATUS_SERVICE_SYNC_PENDING:
status_string = "Sandbox Service Sync Pending"
elif status == Status.STATUS_SERVICE_SYNC_COMPLETED:
status_string = "Sandbox Service Sync Completed"
elif status == Status.STATUS_SERVICE_SYNC_FAILED:
status_string = "Sandbox Service Sync Failed"
elif status == Status.STATUS_SERVICE_SYNC_UNKNOWN:
status_string = "Sandbox Service Sync N/A"
return status_string
class ItsiSandboxService(ItsiService):
"""
Implements ITSI Sandbox Service object
"""
logger = logger
log_prefix = '[ITSI Sandbox Service] '
collection_name = 'itsi_sandbox_service'
object_type = 'sandbox_service'
sandbox_id = 'sandbox_id'
SANDBOX_SERVICE_SPECIFIC_FIELDS = ['sandbox_id', 'kpis.severity_name']
# Fields to remove from sandbox services on restores
SANDBOX_SERVICE_RESTORE_EXCLUDE_FIELDS = ['is_created_from_sandbox', 'publish_history']
def __init__(self, session_key, current_user_name):
logger.info('Instantiating Sandbox Service Object')
self.session_key = session_key
self.current_user_name = current_user_name
super(ItsiSandboxService, self).__init__(
session_key,
current_user_name,
self.object_type,
collection_name=self.collection_name,
is_securable_object=True,
)
# ItsiService attributes
self.synchronous = True
# set the flag to false by default. Set to true when using the publish sandbox using save_batch_publish
self.is_publish_flow = False
self.sandbox_interface = ItsiSandbox(self.session_key, self.current_user_name)
def do_object_validation(
self, owner, objects, validate_name=True, dupname_tag=None, transaction_id=None, skip_local_failure=False,
ignore_same_key=False,
):
super(ItsiSandboxService, self).do_object_validation(
owner,
objects,
validate_name,
dupname_tag,
transaction_id,
skip_local_failure,
ignore_same_key=ignore_same_key,
)
for json_data in objects:
sandbox_id = json_data.get(self.sandbox_id)
if not sandbox_id:
self.raise_error_bad_validation(
logger,
'Sandbox ID not provided: Please provide the sandbox ID before proceeding.',
409,
)
def add_filtering(self, filter_data, objects):
"""
Extending to add additional filtering for Sandbox id
@type filter_data: string
@param filter_data: filter value
@type objects: list of dictionary
@param filter_data: list of itoa_objects
@return: dictionary of filter data
"""
# add additional filter condition to ensure that the persisted data is fetched from
# the correct Service Sandbox
for json_data in objects:
sandbox_id_value = json_data.get(self.sandbox_id)
break
return {'$and': [{'sandbox_id': sandbox_id_value}, {'$or': filter_data}]}
def update_savedsearches(self, service_key, saved_search_changed_kpis, transaction_id=None):
return
def get_bulk_skip_enforce_security(
self, owner, sort_key=None, sort_dir=None, filter_data=None, fields=None, skip=None, limit=None,
req_source='unknown', transaction_id=None,
):
"""
Retrieves objects with no security criteria, matching criteria. If no filtering is specified, retrieves all
objects of this object type.
@type owner: string
@param owner: user who is performing this operation
@type sort_key: string
@param sort_key: string defining keys to sort by
@type sort_dir: string
@param sort_dir: string defining direction for sorting - asc or desc
@type filter_data: dictionary
@param filter_data: json filter constructed to filter data. Follows mongodb syntax
@type fields: list
@param fields: list of fields to retrieve, fetches all fields if not specified
@type skip: number
@param skip: number of items to skip from the start
@type limit: number
@param limit: maximum number of items to return
@type req_source: string
@param req_source: identified source initiating the operation
@rtype: list of dictionary
@return: objects retrieved on success, throws exceptions on errors
"""
transaction_id = self._instrumentation.push("itsi_sandbox_service.get_bulk_skip_enforce_security",
transaction_id=transaction_id, owner=owner)
results = self.do_paged_get_bulk(owner, sort_key=sort_key, sort_dir=sort_dir, filter_data=filter_data,
fields=fields, limit=limit, skip=skip, skip_enforce_security=True,
transaction_id=transaction_id)
number_of_objects = len(results) if utils.is_valid_list(
results) else 1 if utils.is_valid_dict(results) else 0
logger.debug('%s objects of type %s retrieved, request source: %s',
number_of_objects,
self.object_type,
req_source,
)
self._instrumentation.pop("itsi_sandbox_service.get_bulk_skip_enforce_security", transaction_id,
metric_info={"numberOfObjects": number_of_objects})
return results
def identify_dependencies(self, owner, objects, method, req_source='unknown', transaction_id=None,
skip_local_failure=False, dry_run=False):
"""
Removes the refresh queue job created for linking service in base service template object
"""
is_refresh_required, refresh_jobs = super().identify_dependencies(owner, objects, method, req_source,
transaction_id, skip_local_failure, dry_run)
# added service_entities_update and update_shared_base_search jobs to exclude list as they are taking time in order to fix ITSI-30073
refresh_queue_exclusion_list = ['link_service_with_base_service_template', 'service_kpi_backfill_enabled',
'service_kpi_deletion', 'service_entities_update', 'update_shared_base_search',
'modify_kpi_search_type', 'service_kpi_update_alert_period', 'service_kpi_at',
'service_kpi_ad', 'service_kpi_cad']
refresh_jobs = [item for item in refresh_jobs if item.get(
'change_type') not in refresh_queue_exclusion_list]
is_refresh_required = len(refresh_jobs) > 0
return is_refresh_required, refresh_jobs
def post_save_setup(self, owner, ids, services, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT,
transaction_id=None, skip_local_failure=False, dry_run=False, **kwargs):
return
def delete_bulk(
self,
owner,
filter_data=None,
req_source='unknown',
transaction_id=None
):
"""
Deletes objects matching criteria, if no filtering specified, deletes all objects of this object type
@type owner: string
@param owner: user who is performing this operation
@type filter_data: dictionary
@param filter_data: json filter constructed to filter data. Follows mongodb syntax
@type req_source: string
@param req_source: identified source initiating the operation
@return: none, throws exceptions on errors
"""
# Get ids for object which is getting deleted
delete_objects = self.get_bulk(owner, filter_data=filter_data, fields=self.delete_object_fields)
if delete_objects:
self.delete_batch(owner, delete_objects, req_source, transaction_id)
def post_delete(self, owner, deleted_service_ids, req_source='unknown',
method=CRUDMethodTypes.METHOD_UPSERT, transaction_id=None):
return
def create_refresh_jobs(self, refresh_jobs):
"""
Use the synchronous flag defined in __init___ to determine if the refresh jobs are run synchronously or asynchronously
Creates a refresh job for this object type based on passed in refresh requests
@type refresh_jobs: list of dictionary
@param refresh_jobs: refresh job metadata for jobs needed to be created
@return: none, throws exceptions on errors
"""
super().create_refresh_jobs(refresh_jobs, self.synchronous)
def allow_additional_patching(self):
"""
Overriding the default in the itoa_object from False to True
Sandbox Service will be supporting additional patching logic
"""
return True