You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

884 lines
39 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
from ITOA.setup_logging import logger
from ITOA.itoa_common import normalize_num_field, is_valid_str
from ITOA.itoa_object import ItoaObject, CRUDMethodTypes
from ITOA.storage import itoa_storage
from itsi.objects.itsi_service import ItsiService
from itsi.objects.itsi_entity import ItsiEntity
from maintenance_services.maintenance_manifest import SUPPORTED_MAINTENANCE_OBJECT_TYPES
from maintenance_services.constants import MAINTENANCE_CALENDAR_OBJECT_TYPE
from ITOA.controller_utils import ITOAError
from ITOA.itoa_exceptions import ItoaAccessDeniedError
from .utils import object_collection_mapping
_OBJECT_TYPE = MAINTENANCE_CALENDAR_OBJECT_TYPE
class MaintenanceCalendar(ItoaObject):
"""
Implements calendar configuration for moving objects into maintenance mode
"""
log_prefix = '[Maintenance Calendar] '
def __init__(self, session_key, current_user_name):
super(MaintenanceCalendar, self).__init__(
session_key,
current_user_name,
_OBJECT_TYPE,
collection_name=object_collection_mapping[_OBJECT_TYPE],
title_validation_required=True
)
def fetch_objects_without_rbac(self, object_type, object_ids):
"""
Get all the objects for given ids without RBAC filtering using ITOAStorage.
Returns all objects weather user has access to given ids or not, to verify service is deleted or user don't
have read access
Issue: filter string character limit, filter_data for large number of service(above ~8.79k) would cause
the issue and rest call would raise 414 error (too large URI).
Solution: Use batch mechanism to tackle this situation and get services in batch using batch_size 5000 services
@type object_type: string
@param object_type: string defining object type from service, entity
@type object_ids: list
@param object_ids: list of object ids to fetch the objects
@rtype: list of objects
@return: list of objects filtering based on given ids
"""
kwargs = {'collection': 'itsi_services'}
storage_interface = itoa_storage.ITOAStorage(**kwargs)
fetched_objects = []
requested_count = len(object_ids)
requested_skip = 0
batch_size = 5000
while requested_count > 0:
batch_ids = object_ids[requested_skip:requested_skip + min(requested_count, batch_size)]
results = storage_interface.get_all(
self.session_key,
'nobody',
object_type,
filter_data={'$or': [{'_key': id} for id in batch_ids]},
fields=['_key'])
if not results or len(results) == 0:
break
requested_count -= batch_size
requested_skip += batch_size
fetched_objects.extend(results)
return [fetched_object['_key'] for fetched_object in fetched_objects]
def fetch_objects(self, owner, object_type, object_ids, transaction_id):
"""
Get all the objects given for given ids with RBAC filtering.
Returns all objects if user has read access to given ids
Issue: filter string character limit, filter_data for large number of service(above ~8.79k) would cause
the issue and rest call would raise 414 error (too large URI).
Solution: Use batch mechanism to tackle this situation and get services in batch using batch_size 5000 services
@type owner: string
@param owner: "owner" user performing the config
@type object_type: string
@param object_type: string defining object type from(service, entity)
@type object_ids: set
@param object_ids: set of object ids to fetch the objects
@type transaction_id: string
@param transaction_id: unique identifier of a user transaction
@rtype: list of objects
@return: list of objects filtering based on given ids if user has access
"""
object_ids = list(object_ids)
fields = ['_key', 'sec_grp', 'permissions']
if object_type == 'service':
object_interface = ItsiService(
self.session_key, self.current_user_name)
else:
object_interface = ItsiEntity(
self.session_key, self.current_user_name)
fetched_objects = []
requested_count = len(object_ids)
requested_skip = 0
batch_size = 5000
while requested_count > 0:
batch_ids = object_ids[requested_skip:requested_skip + min(requested_count, batch_size)]
results = object_interface.get_bulk(
owner,
fields=fields,
filter_data={'$or': [{'_key': id} for id in batch_ids]},
req_source='MaintenanceCalendarGetBulk',
transaction_id=transaction_id)
if not results or len(results) == 0:
break
requested_count -= batch_size
requested_skip += batch_size
fetched_objects.extend(results)
return fetched_objects
def get_maintenance_objects_accessibility(self, owner, mw_objects, transaction_id):
"""
Get bifurcation of all the objects in maintenance window weather object if accessible to current user or not
Returns list of available and not available services and entities
@type owner: string
@param owner: "owner" user performing the config
@type mw_objects: list
@param mw_objects: list of maintenance window objects
@type transaction_id: string
@param transaction_id: unique identifier of a user transaction
@rtype: tuple of objects
@return: (dictionary of available services, list of not available services, dictionary of available entities,
list of not available entities)
"""
maintenance_service_ids = set()
maintenance_entity_ids = set()
for mw in mw_objects:
if mw and 'objects' in mw.keys():
if type(mw['objects']) is list:
for obj in mw['objects']:
if ((type(obj) is dict) and (set(['object_type', '_key']) == set(obj.keys()))
and type(obj['_key']) is str):
if obj['object_type'] == 'service':
maintenance_service_ids.add(obj['_key'])
if obj['object_type'] == 'entity':
maintenance_entity_ids.add(obj['_key'])
else:
logger.info('No objects inside Maintenance Window: ' + str(mw))
raise ITOAError(status="500", message="Object does not exist.")
accessible_services_by_ids = {}
if maintenance_service_ids:
# get the all maintenance services which current_user_name has access to.
accessible_maintenance_services = self.fetch_objects(
owner,
'service',
maintenance_service_ids,
transaction_id=transaction_id)
for service in accessible_maintenance_services:
if service['_key'] not in accessible_services_by_ids:
accessible_services_by_ids[service['_key']] = service
accessible_entities_by_ids = {}
if maintenance_entity_ids:
# get the all maintenance entities which current_user_name has access to.
accessible_maintenance_entities = self.fetch_objects(
owner,
'entity',
maintenance_entity_ids,
transaction_id=transaction_id)
for entity in accessible_maintenance_entities:
if entity['_key'] not in accessible_entities_by_ids:
accessible_entities_by_ids[entity['_key']] = entity
inaccessible_service_ids = self.fetch_objects_without_rbac(
'service', list(maintenance_service_ids - set(accessible_services_by_ids.keys())))
inaccessible_entity_ids = self.fetch_objects_without_rbac(
'entity', list(maintenance_entity_ids - set(accessible_entities_by_ids.keys())))
return (accessible_services_by_ids,
inaccessible_service_ids,
accessible_entities_by_ids,
inaccessible_entity_ids)
def get_bulk(self,
owner,
sort_key=None,
sort_dir=None,
filter_data=None,
fields=None,
skip=None,
limit=None,
req_source='get_bulk',
transaction_id=None):
"""
Overriding the itoa_object get_bulk function.
The maintenance_window object type is not intended to be 'securable' but we can still enforce
RBAC filtering.
How RBAC filtering is done?
1. Fetch all the available(readable) and not readable service or entities which are part of
all the maintenance windows.
2. For given service or entity id
if it is part of accessible_*_by_ids then logged-in user have read access
if it is part of inaccessible_*_ids then logged-in user don't have read access
if it is not part any above then it is deleted
@type: string
@param owner: "owner" user performing the config
@type sort_key: string
@param sort_key: string defining keys to sort by
@type sort_dir: string
@param sort_dir: string defining direction for sorting - asc or desc
@type filter_data: dictionary
@param filter_data: json filter constructed to filter data. Follows mongodb syntax
@type fields: list
@param fields: list of fields to retrieve, fetches all fields if not specified
@type skip: number
@param skip: number of items to skip from the start
@type limit: number
@param limit: maximum number of items to return
@type req_source: string
@param req_source: identified source initiating the operation
@type transaction_id: string
@param transaction_id: unique identifier of a user transaction
@rtype: list of dictionary
@return: objects retrieved on success, throws exceptions on errors,
list of of maintenance calendar objects filtering based on rbac
"""
return self.do_rbac_filtering(owner,
sort_key,
sort_dir,
filter_data,
fields,
skip,
limit,
req_source,
transaction_id)
def get(self, owner, object_id, req_source='unknown', transaction_id=None):
"""
Overriding the itoa_object get function.
The maintenance_window object type is not intended to be 'securable' but we can still enforce
RBAC filtering.
How RBAC filtering is done?
1. Fetch all the available(readable) and not readable service or entities which are part of
the maintenance Window.
2. For given service or entity id
if it is part of accessible_*_by_ids then logged-in user have read access
if it is part of inaccessible_*_ids then logged-in user don't have read access
if it is not part any above then it is deleted
@type: string
@param owner: "owner" user performing the config
@type object_id: string
@param object_id: id of object to retrieve
@type req_source: string
@param req_source: identified source initiating the operation
@type transaction_id: string
@param transaction_id: unique identifier of a user transaction
@rtype: dictionary
@return: object matching id on success, empty rows if object is not found, throws exceptions on errors
"""
maintenance_object = ItoaObject.get(self,
owner,
object_id,
req_source,
transaction_id)
(accessible_services_by_ids,
inaccessible_service_ids,
accessible_entities_by_ids,
inaccessible_entity_ids) = self.get_maintenance_objects_accessibility(
owner,
[maintenance_object],
transaction_id)
if not maintenance_object:
raise ITOAError(status="500", message="Object does not exist.")
objects = maintenance_object.get('objects')
if objects:
first_object = objects[0]
if first_object:
# maintenance object are either all services or either all entities,
# evaluating the first object is enough.
maintenance_object_type = first_object.get('object_type')
maintenance_object["can_edit"] = True
if maintenance_object_type == 'service':
accessible_object_by_ids = accessible_services_by_ids
inaccessible_object_ids = inaccessible_service_ids
elif maintenance_object_type == 'entity':
accessible_object_by_ids = accessible_entities_by_ids
inaccessible_object_ids = inaccessible_entity_ids
for _object in objects:
object_key = _object.get('_key')
if object_key in accessible_object_by_ids:
fetched_object = accessible_object_by_ids.get(object_key)
if ('permissions' in fetched_object
and 'write' in fetched_object['permissions']
and not fetched_object['permissions']['write']):
maintenance_object["can_edit"] = False
break
elif object_key in inaccessible_object_ids:
raise ItoaAccessDeniedError(
'Access denied. You do not have permission to access this object.', logger)
return maintenance_object
def do_rbac_filtering(self,
owner,
sort_key=None,
sort_dir=None,
filter_data=None,
fields=None,
skip=None,
limit=None,
req_source='do_rbac_filtering',
transaction_id=None):
"""
Filtering out the maintenance calendar objects which contains:
- services that current user doesn't have access to
- entities is in default sec_grp, all user will have access to it.
How is this done?
1. Fetch all the maintenance calendar objects without limit and offset
2. Fetch the services visible to current user
3. Fetch just the first entity to obtain the sec_grp info (which is global)
4. Compare the security_groups_ids of objects in the maintenance calendar objects
against the services security groups user can see.
How RBAC filtering is done?
1. Fetch all the available(readable) and not readable service or entities which are part of
all the maintenance windows.
2. For given service or entity id
if it is part of accessible_*_by_ids then logged-in user have read access
if it is part of inaccessible_*_ids then logged-in user don't have read access
if it is not part any above then it is deleted
@type: string
@param owner: "owner" user performing the config
@type sort_key: string
@param sort_key: string defining keys to sort by
@type sort_dir: string
@param sort_dir: string defining direction for sorting - asc or desc
@type filter_data: dictionary
@param filter_data: json filter constructed to filter data. Follows mongodb syntax
@type fields: list
@param fields: list of fields to retrieve, fetches all fields if not specified
@type skip: number
@param skip: number of items to skip from the start
@type limit: number
@param limit: maximum number of items to return
@type req_source: string
@param req_source: identified source initiating the operation
@type transaction_id: string
@param transaction_id: unique identifier of a user transaction
@rtype: list of dictionary
@return: objects retrieved on success, throws exceptions on errors,
list of of maintenance calendar objects filtering based on rbac
"""
# get all the maintenance_calendar objects by
# enforcing skip = None and limit = None to get them all
if fields is not None and 'objects' not in fields:
fields.append('objects')
if fields is not None and 'sec_grp_list' not in fields:
fields.append('sec_grp_list')
maintenance_objects_full_list = ItoaObject.get_bulk(self,
owner,
sort_key,
sort_dir,
filter_data,
fields,
None,
None,
req_source,
transaction_id)
if len(maintenance_objects_full_list) > 0:
(accessible_services_by_ids,
inaccessible_service_ids,
accessible_entities_by_ids,
_) = self.get_maintenance_objects_accessibility(owner, maintenance_objects_full_list, transaction_id)
services_sec_grp_ids = []
for _, service in accessible_services_by_ids.items():
sec_grp = service["sec_grp"]
if sec_grp not in services_sec_grp_ids:
services_sec_grp_ids.append(sec_grp)
entities_sec_grp_ids = []
for _, entity in accessible_entities_by_ids.items():
sec_grp = entity["sec_grp"]
if sec_grp not in entities_sec_grp_ids:
entities_sec_grp_ids.append(sec_grp)
maintenance_objects_filtered_list = []
for maintenance_object in maintenance_objects_full_list:
# default value
maintenance_object_type = None
objects = maintenance_object.get('objects', [])
if objects:
first_object = objects[0]
if first_object:
# maintenance object are either all services or either all entities,
# evaluating the first object is enough.
maintenance_object_type = first_object.get('object_type')
# default value
maintenance_object_sec_grp_list = maintenance_object.get('sec_grp_list', [])
# at least one service/entity for which user have access to is enough to turn this flag True
can_see_maintenance_object = False
if maintenance_object_type == 'service':
for maintenance_object_sec_grp in maintenance_object_sec_grp_list:
if maintenance_object_sec_grp in services_sec_grp_ids:
can_see_maintenance_object = True
break
elif maintenance_object_type == 'entity':
for maintenance_object_sec_grp in maintenance_object_sec_grp_list:
if maintenance_object_sec_grp in entities_sec_grp_ids:
can_see_maintenance_object = True
break
if can_see_maintenance_object or len(maintenance_object_sec_grp_list) == 0:
# at least one service/entity for which user doesn't have access to is enough
# to turn this flag False
can_edit_maintenance_object = True
if maintenance_object_type == 'service':
for _object in objects:
object_key = _object.get('_key')
if object_key in accessible_services_by_ids:
service = accessible_services_by_ids[_object.get('_key')]
if not service["permissions"]['write']:
can_edit_maintenance_object = False
break
elif object_key in inaccessible_service_ids:
can_edit_maintenance_object = False
break
elif maintenance_object_type == 'entity':
# all entities belong to the Global team, just check one
for key, entity in accessible_entities_by_ids.items():
entity_permission = entity.get('permissions')
if entity_permission:
write = entity_permission.get('write', False)
if not write:
can_edit_maintenance_object = False
else:
can_edit_maintenance_object = False
break
else:
can_edit_maintenance_object = False
maintenance_object['can_edit'] = can_edit_maintenance_object
maintenance_objects_filtered_list.append(maintenance_object)
skip = int(skip) if skip is not None else 0
limit = int(limit) if limit is not None else 0
if limit == 0 and skip == 0:
return maintenance_objects_filtered_list
else:
return maintenance_objects_filtered_list[skip:limit + skip]
return maintenance_objects_full_list
def delete_bulk(self, owner, filter_data=None, req_source='unknown', transaction_id=None):
"""
Deletes objects matching criteria, if no filtering specified, deletes all objects of this object type
How RBAC filtering is done?
1. Fetch all the available(readable) and not readable service or entities which are part of
all the maintenance windows.
2. For given service or entity id
if it is part of accessible_*_by_ids then logged-in user have read access
if it is part of inaccessible_*_ids then logged-in user don't have read access
if it is not part any above then it is deleted
@type owner: string
@param owner: user who is performing this operation
@type filter_data: dictionary
@param filter_data: json filter constructed to filter data. Follows mongodb syntax
@type req_source: string
@param req_source: identified source initiating the operation
@return: none, throws exceptions on errors
"""
# Get ids for object which is getting deleted
transaction_id = self._instrumentation.push("MaintenanceCalendar.delete_bulk",
transaction_id=transaction_id, owner=owner)
delete_objects = self.storage_interface.get_all(self.session_key,
owner,
self.object_type,
filter_data=filter_data,
current_user_name=self.current_user_name,
fields=['_key', 'acl', 'sec_grp', 'objects'])
delete_data = []
if isinstance(delete_objects, list):
(accessible_services_by_ids,
inaccessible_service_ids,
accessible_entities_by_ids,
inaccessible_entity_ids) = self.get_maintenance_objects_accessibility(
owner,
delete_objects,
transaction_id)
for object in delete_objects:
object_id = object.get('_key')
deletable = True
object = ItoaObject.get(self,
owner,
object_id,
req_source,
transaction_id)
if not object:
raise ITOAError(status="500", message="Object does not exist.")
maintenance_objects = object.get('objects')
maintenance_object_type = maintenance_objects[0]['object_type']
if maintenance_object_type == 'service':
accessible_object_by_ids = accessible_services_by_ids
inaccessible_object_ids = inaccessible_service_ids
elif maintenance_object_type == 'entity':
accessible_object_by_ids = accessible_entities_by_ids
inaccessible_object_ids = inaccessible_entity_ids
# check if user has read and write access to all the objects in maintenance window
for maintenance_object in maintenance_objects:
maintenance_object_type = maintenance_object['object_type']
object_key = maintenance_object.get('_key')
if object_key in accessible_object_by_ids:
fetched_object = accessible_object_by_ids.get(object_key)
if ('permissions' in fetched_object
and 'write' in fetched_object['permissions']
and not fetched_object['permissions']['write']):
deletable = False
break
elif object_key in inaccessible_object_ids:
deletable = False
logger.debug(
'Access denied. Object type: %s. Object id: %s. User: %s' % (maintenance_object_type,
maintenance_object.get('_key'),
self.current_user_name))
if deletable:
delete_data.append({'_key': object_id, 'object_type': self.object_type})
del delete_objects
if len(delete_data) > 0:
# Construct filter to only delete objects that user has access for deleting
deletable_objects_filter = self.get_filter_data_for_keys([object.get('_key') for object in delete_data])
is_delete_needed = True
security_enforcer = self._get_security_enforcer()
if self.object_type == security_enforcer.object_type:
if not (len(delete_data) == 1
and delete_data[0].get('_key') == security_enforcer.get_default_itsi_security_group_key()):
deletable_objects_filter = self.get_filter_data_for_keys(
[object.get('_key') for object in delete_data
if object.get('_key') != security_enforcer.get_default_itsi_security_group_key()])
else:
# There is nothing to delete, return
is_delete_needed = False
logger.debug(
'No objects of type %s deleted, request source: %s',
self.object_type,
req_source
)
if is_delete_needed:
self.storage_interface.delete_all(
self.session_key,
owner,
self.object_type,
filter_data=deletable_objects_filter,
current_user_name=self.current_user_name)
logger.debug(
'Objects of type %s deleted, request source: %s',
self.object_type,
req_source
)
# else all objects got filtered out, dont delete any
self._instrumentation.pop("MaintenanceCalendar.delete_bulk", transaction_id)
def delete(self, owner, object_id, req_source='unknown', transaction_id=None):
"""
Delete object by id
How RBAC filtering is done?
1. Fetch all the available(readable) and not readable service or entities which are part of
the maintenance window
2. For given service or entity id
if it is part of accessible_*_by_ids then logged-in user have read access
if it is part of inaccessible_*_ids then logged-in user don't have read access
if it is not part any above then it is deleted
@type owner: string
@param owner: user who is performing this operation
@type object_id: string
@param object_id: id of object to delete
@type req_source: string
@param req_source: identified source initiating the operation
@rtype: string
@return: id of object deleted on success, throws exceptions on errors
"""
transaction_id = self._instrumentation.push("MaintenanceCalendar.delete", transaction_id=transaction_id,
owner=owner)
if not is_valid_str(object_id):
self.raise_error_bad_validation(logger, 'Cannot delete object with invalid object ID.')
object = ItoaObject.get(self,
owner,
object_id,
req_source,
transaction_id)
if not object:
raise ITOAError(status="500", message="Object does not exist.")
maintenance_objects = object.get('objects')
(accessible_services_by_ids,
inaccessible_service_ids,
accessible_entities_by_ids,
inaccessible_entity_ids) = self.get_maintenance_objects_accessibility(owner, [object], transaction_id)
maintenance_object_type = maintenance_objects[0]['object_type']
if maintenance_object_type == 'service':
accessible_object_by_ids = accessible_services_by_ids
inaccessible_object_ids = inaccessible_service_ids
elif maintenance_object_type == 'entity':
accessible_object_by_ids = accessible_entities_by_ids
inaccessible_object_ids = inaccessible_entity_ids
for maintenance_object in maintenance_objects:
object_key = maintenance_object.get('_key')
if object_key in accessible_object_by_ids:
fetched_object = accessible_object_by_ids.get(object_key)
if ('permissions' in fetched_object
and 'write' in fetched_object['permissions']
and not fetched_object['permissions']['write']):
raise ITOAError(status='403', message='Permission denied.')
elif object_key in inaccessible_object_ids:
raise ITOAError(status="403", message="Permission denied.")
results = self.storage_interface.delete(
self.session_key,
owner,
self.object_type,
object_id,
current_user_name=self.current_user_name)
logger.debug('Object of type %s with ID: %s deleted, request source: %s',
self.object_type,
object_id,
req_source
)
self._instrumentation.pop("MaintenanceCalendar.delete", transaction_id)
return results
def do_additional_setup(self, owner, objects, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT,
transaction_id=None, skip_local_failure=False):
"""
Additional setup performed during edit/create operations.
Primarily involves validation of calendar schema.
How RBAC filtering is done?
1. Fetch all the available(readable) and not readable service or entities which are part of
all the maintenance windows.
2. For given service or entity id
if it is part of accessible_*_by_ids then logged-in user have read access
if it is part of inaccessible_*_ids then logged-in user don't have read access
if it is not part any above then it is deleted
@type: string
@param owner: "owner" user performing the config
@type: list of dict
@param objects: list of calendars being configured as an array of JSON specifications
@type: string
@param req_source: source initiating this operation, for tracking
@type: string
@param method: type of CRUD operation being performed
@type transaction_id: string
@param transaction_id: unique identifier of a user transaction
@rtype: None
@return: None
"""
(accessible_services_by_ids,
inaccessible_service_ids,
accessible_entities_by_ids,
inaccessible_entity_ids) = self.get_maintenance_objects_accessibility(owner, objects, transaction_id)
def maintenance_object_validation(maintenance_object, expected_maintenance_object_type):
if (
(not isinstance(maintenance_object, dict))
or ('_key' not in maintenance_object)
or ('object_type' not in maintenance_object)
):
self.raise_error_bad_validation(
logger,
'At least one object specified in invalid. '
'Must specify a key and an object type for each object.'
)
if not is_valid_str(maintenance_object['_key']):
self.raise_error_bad_validation(
logger,
'At least one object specified with invalid key. Must specify a valid key.'
)
maintenance_object_type = maintenance_object['object_type']
if (
(not is_valid_str(maintenance_object_type))
or (maintenance_object_type not in SUPPORTED_MAINTENANCE_OBJECT_TYPES)
):
self.raise_error_bad_validation(
logger,
'Invalid object types specified. Supported object types are: ' + str(
SUPPORTED_MAINTENANCE_OBJECT_TYPES
)
)
for json_data in objects:
# Assume json_data is valid
# Validate time fields, expected to be in epoch time format = float
if (json_data.get('start_time') is None) or (json_data.get('end_time') is None):
self.raise_error_bad_validation(
logger,
'Start time and end time are mandatory fields. Must specify both.'
)
normalize_num_field(json_data, 'start_time', numclass=float)
normalize_num_field(json_data, 'end_time', numclass=float)
start_time = json_data['start_time']
end_time = json_data['end_time']
if (
(not isinstance(start_time, float))
or (not isinstance(end_time, float))
or (start_time < 0)
or (end_time < 0)
):
self.raise_error_bad_validation(
logger,
'Start time and end time must be valid epoch time. Check the values.'
)
if start_time >= end_time:
self.raise_error_bad_validation(
logger,
'Start time must be earlier than end time. Check the values.'
)
# Validate objects
maintenance_objects = json_data.get('objects')
if (not isinstance(maintenance_objects, list)) or (len(maintenance_objects) < 1):
self.raise_error_bad_validation(
logger,
'Objects specified must be a valid non-empty list. Must specify at least one object.'
)
if type(maintenance_objects[0]) is not dict:
self.raise_error_bad_validation(
logger,
'Objects specified must be a valid non-empty list. Must specify at least one object.'
)
if 'object_type' not in maintenance_objects[0].keys():
self.raise_error_bad_validation(
logger,
'Objects specified must be a valid non-empty list. Must specify at least one object.'
)
expected_maintenance_object_type = maintenance_objects[0]['object_type']
if (
(not is_valid_str(expected_maintenance_object_type))
or (expected_maintenance_object_type not in SUPPORTED_MAINTENANCE_OBJECT_TYPES)
):
self.raise_error_bad_validation(
logger,
'Invalid object types specified. Supported object types are: ' + str(
SUPPORTED_MAINTENANCE_OBJECT_TYPES
)
)
associated_objects_keys = []
if expected_maintenance_object_type == 'service':
accessible_object_by_ids = accessible_services_by_ids
inaccessible_object_ids = inaccessible_service_ids
elif expected_maintenance_object_type == 'entity':
accessible_object_by_ids = accessible_entities_by_ids
inaccessible_object_ids = inaccessible_entity_ids
for maintenance_object in maintenance_objects:
maintenance_object_validation(maintenance_object, expected_maintenance_object_type)
# We will skip validation of whether objects specified actually exist since stale objects could
# exist in stale/expired config and are better off being preserved for tracking purposes.
# Configuring maintenance is also simpler to use this way.
object_key = maintenance_object.get('_key')
if object_key in accessible_object_by_ids:
fetched_object = accessible_object_by_ids.get(object_key)
if ('permissions' in fetched_object
and 'write' in fetched_object['permissions']
and not fetched_object['permissions']['write']):
raise ITOAError(status='403', message='Permission denied.')
if fetched_object.get('sec_grp') and not fetched_object['sec_grp'] in associated_objects_keys:
associated_objects_keys.append(fetched_object['sec_grp'])
elif object_key in inaccessible_object_ids:
raise ITOAError(status="403", message="Permission denied.")
# len(associated_objects_keys) == 0 should not append at maintenance_calendar creation
# but it can happen when creating a maintenance_calendar with services and entities
# that doesn't exist => test_maintenance_services_interface.py
if len(associated_objects_keys) == 0:
json_data['sec_grp_list'] = [self._get_security_enforcer().get_default_itsi_security_group_key()]
else:
json_data['sec_grp_list'] = associated_objects_keys