# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. from ITOA.setup_logging import logger from ITOA.itoa_common import normalize_num_field, is_valid_str from ITOA.itoa_object import ItoaObject, CRUDMethodTypes from ITOA.storage import itoa_storage from itsi.objects.itsi_service import ItsiService from itsi.objects.itsi_entity import ItsiEntity from maintenance_services.maintenance_manifest import SUPPORTED_MAINTENANCE_OBJECT_TYPES from maintenance_services.constants import MAINTENANCE_CALENDAR_OBJECT_TYPE from ITOA.controller_utils import ITOAError from ITOA.itoa_exceptions import ItoaAccessDeniedError from .utils import object_collection_mapping _OBJECT_TYPE = MAINTENANCE_CALENDAR_OBJECT_TYPE class MaintenanceCalendar(ItoaObject): """ Implements calendar configuration for moving objects into maintenance mode """ log_prefix = '[Maintenance Calendar] ' def __init__(self, session_key, current_user_name): super(MaintenanceCalendar, self).__init__( session_key, current_user_name, _OBJECT_TYPE, collection_name=object_collection_mapping[_OBJECT_TYPE], title_validation_required=True ) def fetch_objects_without_rbac(self, object_type, object_ids): """ Get all the objects for given ids without RBAC filtering using ITOAStorage. Returns all objects weather user has access to given ids or not, to verify service is deleted or user don't have read access Issue: filter string character limit, filter_data for large number of service(above ~8.79k) would cause the issue and rest call would raise 414 error (too large URI). Solution: Use batch mechanism to tackle this situation and get services in batch using batch_size 5000 services @type object_type: string @param object_type: string defining object type from service, entity @type object_ids: list @param object_ids: list of object ids to fetch the objects @rtype: list of objects @return: list of objects filtering based on given ids """ kwargs = {'collection': 'itsi_services'} storage_interface = itoa_storage.ITOAStorage(**kwargs) fetched_objects = [] requested_count = len(object_ids) requested_skip = 0 batch_size = 5000 while requested_count > 0: batch_ids = object_ids[requested_skip:requested_skip + min(requested_count, batch_size)] results = storage_interface.get_all( self.session_key, 'nobody', object_type, filter_data={'$or': [{'_key': id} for id in batch_ids]}, fields=['_key']) if not results or len(results) == 0: break requested_count -= batch_size requested_skip += batch_size fetched_objects.extend(results) return [fetched_object['_key'] for fetched_object in fetched_objects] def fetch_objects(self, owner, object_type, object_ids, transaction_id): """ Get all the objects given for given ids with RBAC filtering. Returns all objects if user has read access to given ids Issue: filter string character limit, filter_data for large number of service(above ~8.79k) would cause the issue and rest call would raise 414 error (too large URI). Solution: Use batch mechanism to tackle this situation and get services in batch using batch_size 5000 services @type owner: string @param owner: "owner" user performing the config @type object_type: string @param object_type: string defining object type from(service, entity) @type object_ids: set @param object_ids: set of object ids to fetch the objects @type transaction_id: string @param transaction_id: unique identifier of a user transaction @rtype: list of objects @return: list of objects filtering based on given ids if user has access """ object_ids = list(object_ids) fields = ['_key', 'sec_grp', 'permissions'] if object_type == 'service': object_interface = ItsiService( self.session_key, self.current_user_name) else: object_interface = ItsiEntity( self.session_key, self.current_user_name) fetched_objects = [] requested_count = len(object_ids) requested_skip = 0 batch_size = 5000 while requested_count > 0: batch_ids = object_ids[requested_skip:requested_skip + min(requested_count, batch_size)] results = object_interface.get_bulk( owner, fields=fields, filter_data={'$or': [{'_key': id} for id in batch_ids]}, req_source='MaintenanceCalendarGetBulk', transaction_id=transaction_id) if not results or len(results) == 0: break requested_count -= batch_size requested_skip += batch_size fetched_objects.extend(results) return fetched_objects def get_maintenance_objects_accessibility(self, owner, mw_objects, transaction_id): """ Get bifurcation of all the objects in maintenance window weather object if accessible to current user or not Returns list of available and not available services and entities @type owner: string @param owner: "owner" user performing the config @type mw_objects: list @param mw_objects: list of maintenance window objects @type transaction_id: string @param transaction_id: unique identifier of a user transaction @rtype: tuple of objects @return: (dictionary of available services, list of not available services, dictionary of available entities, list of not available entities) """ maintenance_service_ids = set() maintenance_entity_ids = set() for mw in mw_objects: if mw and 'objects' in mw.keys(): if type(mw['objects']) is list: for obj in mw['objects']: if ((type(obj) is dict) and (set(['object_type', '_key']) == set(obj.keys())) and type(obj['_key']) is str): if obj['object_type'] == 'service': maintenance_service_ids.add(obj['_key']) if obj['object_type'] == 'entity': maintenance_entity_ids.add(obj['_key']) else: logger.info('No objects inside Maintenance Window: ' + str(mw)) raise ITOAError(status="500", message="Object does not exist.") accessible_services_by_ids = {} if maintenance_service_ids: # get the all maintenance services which current_user_name has access to. accessible_maintenance_services = self.fetch_objects( owner, 'service', maintenance_service_ids, transaction_id=transaction_id) for service in accessible_maintenance_services: if service['_key'] not in accessible_services_by_ids: accessible_services_by_ids[service['_key']] = service accessible_entities_by_ids = {} if maintenance_entity_ids: # get the all maintenance entities which current_user_name has access to. accessible_maintenance_entities = self.fetch_objects( owner, 'entity', maintenance_entity_ids, transaction_id=transaction_id) for entity in accessible_maintenance_entities: if entity['_key'] not in accessible_entities_by_ids: accessible_entities_by_ids[entity['_key']] = entity inaccessible_service_ids = self.fetch_objects_without_rbac( 'service', list(maintenance_service_ids - set(accessible_services_by_ids.keys()))) inaccessible_entity_ids = self.fetch_objects_without_rbac( 'entity', list(maintenance_entity_ids - set(accessible_entities_by_ids.keys()))) return (accessible_services_by_ids, inaccessible_service_ids, accessible_entities_by_ids, inaccessible_entity_ids) def get_bulk(self, owner, sort_key=None, sort_dir=None, filter_data=None, fields=None, skip=None, limit=None, req_source='get_bulk', transaction_id=None): """ Overriding the itoa_object get_bulk function. The maintenance_window object type is not intended to be 'securable' but we can still enforce RBAC filtering. How RBAC filtering is done? 1. Fetch all the available(readable) and not readable service or entities which are part of all the maintenance windows. 2. For given service or entity id if it is part of accessible_*_by_ids then logged-in user have read access if it is part of inaccessible_*_ids then logged-in user don't have read access if it is not part any above then it is deleted @type: string @param owner: "owner" user performing the config @type sort_key: string @param sort_key: string defining keys to sort by @type sort_dir: string @param sort_dir: string defining direction for sorting - asc or desc @type filter_data: dictionary @param filter_data: json filter constructed to filter data. Follows mongodb syntax @type fields: list @param fields: list of fields to retrieve, fetches all fields if not specified @type skip: number @param skip: number of items to skip from the start @type limit: number @param limit: maximum number of items to return @type req_source: string @param req_source: identified source initiating the operation @type transaction_id: string @param transaction_id: unique identifier of a user transaction @rtype: list of dictionary @return: objects retrieved on success, throws exceptions on errors, list of of maintenance calendar objects filtering based on rbac """ return self.do_rbac_filtering(owner, sort_key, sort_dir, filter_data, fields, skip, limit, req_source, transaction_id) def get(self, owner, object_id, req_source='unknown', transaction_id=None): """ Overriding the itoa_object get function. The maintenance_window object type is not intended to be 'securable' but we can still enforce RBAC filtering. How RBAC filtering is done? 1. Fetch all the available(readable) and not readable service or entities which are part of the maintenance Window. 2. For given service or entity id if it is part of accessible_*_by_ids then logged-in user have read access if it is part of inaccessible_*_ids then logged-in user don't have read access if it is not part any above then it is deleted @type: string @param owner: "owner" user performing the config @type object_id: string @param object_id: id of object to retrieve @type req_source: string @param req_source: identified source initiating the operation @type transaction_id: string @param transaction_id: unique identifier of a user transaction @rtype: dictionary @return: object matching id on success, empty rows if object is not found, throws exceptions on errors """ maintenance_object = ItoaObject.get(self, owner, object_id, req_source, transaction_id) (accessible_services_by_ids, inaccessible_service_ids, accessible_entities_by_ids, inaccessible_entity_ids) = self.get_maintenance_objects_accessibility( owner, [maintenance_object], transaction_id) if not maintenance_object: raise ITOAError(status="500", message="Object does not exist.") objects = maintenance_object.get('objects') if objects: first_object = objects[0] if first_object: # maintenance object are either all services or either all entities, # evaluating the first object is enough. maintenance_object_type = first_object.get('object_type') maintenance_object["can_edit"] = True if maintenance_object_type == 'service': accessible_object_by_ids = accessible_services_by_ids inaccessible_object_ids = inaccessible_service_ids elif maintenance_object_type == 'entity': accessible_object_by_ids = accessible_entities_by_ids inaccessible_object_ids = inaccessible_entity_ids for _object in objects: object_key = _object.get('_key') if object_key in accessible_object_by_ids: fetched_object = accessible_object_by_ids.get(object_key) if ('permissions' in fetched_object and 'write' in fetched_object['permissions'] and not fetched_object['permissions']['write']): maintenance_object["can_edit"] = False break elif object_key in inaccessible_object_ids: raise ItoaAccessDeniedError( 'Access denied. You do not have permission to access this object.', logger) return maintenance_object def do_rbac_filtering(self, owner, sort_key=None, sort_dir=None, filter_data=None, fields=None, skip=None, limit=None, req_source='do_rbac_filtering', transaction_id=None): """ Filtering out the maintenance calendar objects which contains: - services that current user doesn't have access to - entities is in default sec_grp, all user will have access to it. How is this done? 1. Fetch all the maintenance calendar objects without limit and offset 2. Fetch the services visible to current user 3. Fetch just the first entity to obtain the sec_grp info (which is global) 4. Compare the security_groups_ids of objects in the maintenance calendar objects against the services security groups user can see. How RBAC filtering is done? 1. Fetch all the available(readable) and not readable service or entities which are part of all the maintenance windows. 2. For given service or entity id if it is part of accessible_*_by_ids then logged-in user have read access if it is part of inaccessible_*_ids then logged-in user don't have read access if it is not part any above then it is deleted @type: string @param owner: "owner" user performing the config @type sort_key: string @param sort_key: string defining keys to sort by @type sort_dir: string @param sort_dir: string defining direction for sorting - asc or desc @type filter_data: dictionary @param filter_data: json filter constructed to filter data. Follows mongodb syntax @type fields: list @param fields: list of fields to retrieve, fetches all fields if not specified @type skip: number @param skip: number of items to skip from the start @type limit: number @param limit: maximum number of items to return @type req_source: string @param req_source: identified source initiating the operation @type transaction_id: string @param transaction_id: unique identifier of a user transaction @rtype: list of dictionary @return: objects retrieved on success, throws exceptions on errors, list of of maintenance calendar objects filtering based on rbac """ # get all the maintenance_calendar objects by # enforcing skip = None and limit = None to get them all if fields is not None and 'objects' not in fields: fields.append('objects') if fields is not None and 'sec_grp_list' not in fields: fields.append('sec_grp_list') maintenance_objects_full_list = ItoaObject.get_bulk(self, owner, sort_key, sort_dir, filter_data, fields, None, None, req_source, transaction_id) if len(maintenance_objects_full_list) > 0: (accessible_services_by_ids, inaccessible_service_ids, accessible_entities_by_ids, _) = self.get_maintenance_objects_accessibility(owner, maintenance_objects_full_list, transaction_id) services_sec_grp_ids = [] for _, service in accessible_services_by_ids.items(): sec_grp = service["sec_grp"] if sec_grp not in services_sec_grp_ids: services_sec_grp_ids.append(sec_grp) entities_sec_grp_ids = [] for _, entity in accessible_entities_by_ids.items(): sec_grp = entity["sec_grp"] if sec_grp not in entities_sec_grp_ids: entities_sec_grp_ids.append(sec_grp) maintenance_objects_filtered_list = [] for maintenance_object in maintenance_objects_full_list: # default value maintenance_object_type = None objects = maintenance_object.get('objects', []) if objects: first_object = objects[0] if first_object: # maintenance object are either all services or either all entities, # evaluating the first object is enough. maintenance_object_type = first_object.get('object_type') # default value maintenance_object_sec_grp_list = maintenance_object.get('sec_grp_list', []) # at least one service/entity for which user have access to is enough to turn this flag True can_see_maintenance_object = False if maintenance_object_type == 'service': for maintenance_object_sec_grp in maintenance_object_sec_grp_list: if maintenance_object_sec_grp in services_sec_grp_ids: can_see_maintenance_object = True break elif maintenance_object_type == 'entity': for maintenance_object_sec_grp in maintenance_object_sec_grp_list: if maintenance_object_sec_grp in entities_sec_grp_ids: can_see_maintenance_object = True break if can_see_maintenance_object or len(maintenance_object_sec_grp_list) == 0: # at least one service/entity for which user doesn't have access to is enough # to turn this flag False can_edit_maintenance_object = True if maintenance_object_type == 'service': for _object in objects: object_key = _object.get('_key') if object_key in accessible_services_by_ids: service = accessible_services_by_ids[_object.get('_key')] if not service["permissions"]['write']: can_edit_maintenance_object = False break elif object_key in inaccessible_service_ids: can_edit_maintenance_object = False break elif maintenance_object_type == 'entity': # all entities belong to the Global team, just check one for key, entity in accessible_entities_by_ids.items(): entity_permission = entity.get('permissions') if entity_permission: write = entity_permission.get('write', False) if not write: can_edit_maintenance_object = False else: can_edit_maintenance_object = False break else: can_edit_maintenance_object = False maintenance_object['can_edit'] = can_edit_maintenance_object maintenance_objects_filtered_list.append(maintenance_object) skip = int(skip) if skip is not None else 0 limit = int(limit) if limit is not None else 0 if limit == 0 and skip == 0: return maintenance_objects_filtered_list else: return maintenance_objects_filtered_list[skip:limit + skip] return maintenance_objects_full_list def delete_bulk(self, owner, filter_data=None, req_source='unknown', transaction_id=None): """ Deletes objects matching criteria, if no filtering specified, deletes all objects of this object type How RBAC filtering is done? 1. Fetch all the available(readable) and not readable service or entities which are part of all the maintenance windows. 2. For given service or entity id if it is part of accessible_*_by_ids then logged-in user have read access if it is part of inaccessible_*_ids then logged-in user don't have read access if it is not part any above then it is deleted @type owner: string @param owner: user who is performing this operation @type filter_data: dictionary @param filter_data: json filter constructed to filter data. Follows mongodb syntax @type req_source: string @param req_source: identified source initiating the operation @return: none, throws exceptions on errors """ # Get ids for object which is getting deleted transaction_id = self._instrumentation.push("MaintenanceCalendar.delete_bulk", transaction_id=transaction_id, owner=owner) delete_objects = self.storage_interface.get_all(self.session_key, owner, self.object_type, filter_data=filter_data, current_user_name=self.current_user_name, fields=['_key', 'acl', 'sec_grp', 'objects']) delete_data = [] if isinstance(delete_objects, list): (accessible_services_by_ids, inaccessible_service_ids, accessible_entities_by_ids, inaccessible_entity_ids) = self.get_maintenance_objects_accessibility( owner, delete_objects, transaction_id) for object in delete_objects: object_id = object.get('_key') deletable = True object = ItoaObject.get(self, owner, object_id, req_source, transaction_id) if not object: raise ITOAError(status="500", message="Object does not exist.") maintenance_objects = object.get('objects') maintenance_object_type = maintenance_objects[0]['object_type'] if maintenance_object_type == 'service': accessible_object_by_ids = accessible_services_by_ids inaccessible_object_ids = inaccessible_service_ids elif maintenance_object_type == 'entity': accessible_object_by_ids = accessible_entities_by_ids inaccessible_object_ids = inaccessible_entity_ids # check if user has read and write access to all the objects in maintenance window for maintenance_object in maintenance_objects: maintenance_object_type = maintenance_object['object_type'] object_key = maintenance_object.get('_key') if object_key in accessible_object_by_ids: fetched_object = accessible_object_by_ids.get(object_key) if ('permissions' in fetched_object and 'write' in fetched_object['permissions'] and not fetched_object['permissions']['write']): deletable = False break elif object_key in inaccessible_object_ids: deletable = False logger.debug( 'Access denied. Object type: %s. Object id: %s. User: %s' % (maintenance_object_type, maintenance_object.get('_key'), self.current_user_name)) if deletable: delete_data.append({'_key': object_id, 'object_type': self.object_type}) del delete_objects if len(delete_data) > 0: # Construct filter to only delete objects that user has access for deleting deletable_objects_filter = self.get_filter_data_for_keys([object.get('_key') for object in delete_data]) is_delete_needed = True security_enforcer = self._get_security_enforcer() if self.object_type == security_enforcer.object_type: if not (len(delete_data) == 1 and delete_data[0].get('_key') == security_enforcer.get_default_itsi_security_group_key()): deletable_objects_filter = self.get_filter_data_for_keys( [object.get('_key') for object in delete_data if object.get('_key') != security_enforcer.get_default_itsi_security_group_key()]) else: # There is nothing to delete, return is_delete_needed = False logger.debug( 'No objects of type %s deleted, request source: %s', self.object_type, req_source ) if is_delete_needed: self.storage_interface.delete_all( self.session_key, owner, self.object_type, filter_data=deletable_objects_filter, current_user_name=self.current_user_name) logger.debug( 'Objects of type %s deleted, request source: %s', self.object_type, req_source ) # else all objects got filtered out, dont delete any self._instrumentation.pop("MaintenanceCalendar.delete_bulk", transaction_id) def delete(self, owner, object_id, req_source='unknown', transaction_id=None): """ Delete object by id How RBAC filtering is done? 1. Fetch all the available(readable) and not readable service or entities which are part of the maintenance window 2. For given service or entity id if it is part of accessible_*_by_ids then logged-in user have read access if it is part of inaccessible_*_ids then logged-in user don't have read access if it is not part any above then it is deleted @type owner: string @param owner: user who is performing this operation @type object_id: string @param object_id: id of object to delete @type req_source: string @param req_source: identified source initiating the operation @rtype: string @return: id of object deleted on success, throws exceptions on errors """ transaction_id = self._instrumentation.push("MaintenanceCalendar.delete", transaction_id=transaction_id, owner=owner) if not is_valid_str(object_id): self.raise_error_bad_validation(logger, 'Cannot delete object with invalid object ID.') object = ItoaObject.get(self, owner, object_id, req_source, transaction_id) if not object: raise ITOAError(status="500", message="Object does not exist.") maintenance_objects = object.get('objects') (accessible_services_by_ids, inaccessible_service_ids, accessible_entities_by_ids, inaccessible_entity_ids) = self.get_maintenance_objects_accessibility(owner, [object], transaction_id) maintenance_object_type = maintenance_objects[0]['object_type'] if maintenance_object_type == 'service': accessible_object_by_ids = accessible_services_by_ids inaccessible_object_ids = inaccessible_service_ids elif maintenance_object_type == 'entity': accessible_object_by_ids = accessible_entities_by_ids inaccessible_object_ids = inaccessible_entity_ids for maintenance_object in maintenance_objects: object_key = maintenance_object.get('_key') if object_key in accessible_object_by_ids: fetched_object = accessible_object_by_ids.get(object_key) if ('permissions' in fetched_object and 'write' in fetched_object['permissions'] and not fetched_object['permissions']['write']): raise ITOAError(status='403', message='Permission denied.') elif object_key in inaccessible_object_ids: raise ITOAError(status="403", message="Permission denied.") results = self.storage_interface.delete( self.session_key, owner, self.object_type, object_id, current_user_name=self.current_user_name) logger.debug('Object of type %s with ID: %s deleted, request source: %s', self.object_type, object_id, req_source ) self._instrumentation.pop("MaintenanceCalendar.delete", transaction_id) return results def do_additional_setup(self, owner, objects, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT, transaction_id=None, skip_local_failure=False): """ Additional setup performed during edit/create operations. Primarily involves validation of calendar schema. How RBAC filtering is done? 1. Fetch all the available(readable) and not readable service or entities which are part of all the maintenance windows. 2. For given service or entity id if it is part of accessible_*_by_ids then logged-in user have read access if it is part of inaccessible_*_ids then logged-in user don't have read access if it is not part any above then it is deleted @type: string @param owner: "owner" user performing the config @type: list of dict @param objects: list of calendars being configured as an array of JSON specifications @type: string @param req_source: source initiating this operation, for tracking @type: string @param method: type of CRUD operation being performed @type transaction_id: string @param transaction_id: unique identifier of a user transaction @rtype: None @return: None """ (accessible_services_by_ids, inaccessible_service_ids, accessible_entities_by_ids, inaccessible_entity_ids) = self.get_maintenance_objects_accessibility(owner, objects, transaction_id) def maintenance_object_validation(maintenance_object, expected_maintenance_object_type): if ( (not isinstance(maintenance_object, dict)) or ('_key' not in maintenance_object) or ('object_type' not in maintenance_object) ): self.raise_error_bad_validation( logger, 'At least one object specified in invalid. ' 'Must specify a key and an object type for each object.' ) if not is_valid_str(maintenance_object['_key']): self.raise_error_bad_validation( logger, 'At least one object specified with invalid key. Must specify a valid key.' ) maintenance_object_type = maintenance_object['object_type'] if ( (not is_valid_str(maintenance_object_type)) or (maintenance_object_type not in SUPPORTED_MAINTENANCE_OBJECT_TYPES) ): self.raise_error_bad_validation( logger, 'Invalid object types specified. Supported object types are: ' + str( SUPPORTED_MAINTENANCE_OBJECT_TYPES ) ) for json_data in objects: # Assume json_data is valid # Validate time fields, expected to be in epoch time format = float if (json_data.get('start_time') is None) or (json_data.get('end_time') is None): self.raise_error_bad_validation( logger, 'Start time and end time are mandatory fields. Must specify both.' ) normalize_num_field(json_data, 'start_time', numclass=float) normalize_num_field(json_data, 'end_time', numclass=float) start_time = json_data['start_time'] end_time = json_data['end_time'] if ( (not isinstance(start_time, float)) or (not isinstance(end_time, float)) or (start_time < 0) or (end_time < 0) ): self.raise_error_bad_validation( logger, 'Start time and end time must be valid epoch time. Check the values.' ) if start_time >= end_time: self.raise_error_bad_validation( logger, 'Start time must be earlier than end time. Check the values.' ) # Validate objects maintenance_objects = json_data.get('objects') if (not isinstance(maintenance_objects, list)) or (len(maintenance_objects) < 1): self.raise_error_bad_validation( logger, 'Objects specified must be a valid non-empty list. Must specify at least one object.' ) if type(maintenance_objects[0]) is not dict: self.raise_error_bad_validation( logger, 'Objects specified must be a valid non-empty list. Must specify at least one object.' ) if 'object_type' not in maintenance_objects[0].keys(): self.raise_error_bad_validation( logger, 'Objects specified must be a valid non-empty list. Must specify at least one object.' ) expected_maintenance_object_type = maintenance_objects[0]['object_type'] if ( (not is_valid_str(expected_maintenance_object_type)) or (expected_maintenance_object_type not in SUPPORTED_MAINTENANCE_OBJECT_TYPES) ): self.raise_error_bad_validation( logger, 'Invalid object types specified. Supported object types are: ' + str( SUPPORTED_MAINTENANCE_OBJECT_TYPES ) ) associated_objects_keys = [] if expected_maintenance_object_type == 'service': accessible_object_by_ids = accessible_services_by_ids inaccessible_object_ids = inaccessible_service_ids elif expected_maintenance_object_type == 'entity': accessible_object_by_ids = accessible_entities_by_ids inaccessible_object_ids = inaccessible_entity_ids for maintenance_object in maintenance_objects: maintenance_object_validation(maintenance_object, expected_maintenance_object_type) # We will skip validation of whether objects specified actually exist since stale objects could # exist in stale/expired config and are better off being preserved for tracking purposes. # Configuring maintenance is also simpler to use this way. object_key = maintenance_object.get('_key') if object_key in accessible_object_by_ids: fetched_object = accessible_object_by_ids.get(object_key) if ('permissions' in fetched_object and 'write' in fetched_object['permissions'] and not fetched_object['permissions']['write']): raise ITOAError(status='403', message='Permission denied.') if fetched_object.get('sec_grp') and not fetched_object['sec_grp'] in associated_objects_keys: associated_objects_keys.append(fetched_object['sec_grp']) elif object_key in inaccessible_object_ids: raise ITOAError(status="403", message="Permission denied.") # len(associated_objects_keys) == 0 should not append at maintenance_calendar creation # but it can happen when creating a maintenance_calendar with services and entities # that doesn't exist => test_maintenance_services_interface.py if len(associated_objects_keys) == 0: json_data['sec_grp_list'] = [self._get_security_enforcer().get_default_itsi_security_group_key()] else: json_data['sec_grp_list'] = associated_objects_keys