# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import re import time import itertools import http.client import ITOA.itoa_common as utils from ITOA.setup_logging import logger from ITOA.itoa_object import ItoaObject, CRUDMethodTypes from itsi.objects.itsi_entity_data_drilldown import EntityDataDrilldown from itsi.objects.itsi_entity_dashboard_drilldown import EntityDashboardDrilldown from itsi.objects.itsi_entity_discovery_search import ItsiEntityDiscoverySearch from itsi.objects.itsi_entity_type import ItsiEntityType from itsi.objects.itsi_import_objects_cache import ItsiBulkImportEntityDimensionCacheManager from itsi.itsi_utils import ITOAInterfaceUtils from itsi.itsi_const import ITOAObjConst from ITOA.itoa_exceptions import ItoaValidationError class ItsiEntity(ItoaObject): """ Implements ITSI entity """ log_prefix = '[ITSI Entity] ' COLLECTION_NAME = 'itsi_services' ITOA_OBJECT_TYPE = 'entity' _entity_internal_keywords = ITOAObjConst.ENTITY_INTERNAL_KEYWORDS regex_invalid_chars = re.compile('^\$|[=.,"\']+') # noqa: W605 def __init__(self, session_key, current_user_name): super(ItsiEntity, self).__init__(session_key, current_user_name, 'entity', collection_name=self.COLLECTION_NAME, is_securable_object=True, title_validation_required=False) def _replace_raw_status(self, owner, entities): kv_entity_discovery_search = ItsiEntityDiscoverySearch(self.session_key, self.current_user_name) saved_search_execution_times = { kv_store_entry['_key']: kv_store_entry['last_execution_time'] for kv_store_entry in kv_entity_discovery_search.get_bulk(owner) } for entity in entities: if not entity: continue saved_search_names = list(entity.get('_status', {}).get('breakdown', {}).keys()) for saved_search_name in saved_search_names: search_result = entity['_status']['breakdown'][saved_search_name] if saved_search_name in saved_search_execution_times: replacement_time = saved_search_execution_times[saved_search_name] else: replacement_time = time.time() # for backward captibility case: { itsi import objects - os : JUST_NOW } if type(search_result) is not dict and search_result == 'JUST_NOW': entity['_status']['breakdown'][saved_search_name] = replacement_time # for new format : { itsi import objects - os : {refresh_time: 'JUST_NOW', entity_status_tracking: 0} } elif type(search_result) is dict and 'refresh_time' in search_result: if 'refresh_time_raw' in search_result: # already got the raw value in the object, proceed with replacement if search_result['refresh_time_raw'] == 'JUST_NOW': search_result['refresh_time'] = replacement_time else: search_result['refresh_time'] = search_result['refresh_time_raw'] else: # refresh_time_raw not there, lets save a copy from refresh_time search_result['refresh_time_raw'] = search_result['refresh_time'] # save the original value if search_result['refresh_time'] == 'JUST_NOW': search_result['refresh_time'] = replacement_time return entities def get(self, owner, object_id, req_source='unknown', transaction_id=None, replace_raw_status=True): result = super(ItsiEntity, self).get(owner, object_id, req_source, transaction_id) if replace_raw_status: return self._replace_raw_status(owner, [result])[0] return result def get_bulk( self, owner, sort_key=None, sort_dir=None, filter_data=None, fields=None, skip=None, limit=None, req_source='unknown', transaction_id=None, replace_raw_status=True ): results = super(ItsiEntity, self).get_bulk(owner, sort_key, sort_dir, filter_data, fields, skip, limit, req_source, transaction_id) if replace_raw_status: return self._replace_raw_status(owner, results) return results def do_object_validation(self, owner, objects, validate_name=True, dupname_tag=None, transaction_id=None, skip_local_failure=False): ItoaObject.do_object_validation(self, owner, objects, validate_name, dupname_tag, transaction_id, skip_local_failure) # we do not want to validate uniqueness for entities, but still want valid titles # the title_validation_required flag is too broad so copy this logic here # It would be nice if this could be cleaned up at some point for json_data in objects: if not utils.is_valid_name(json_data.get('title')): self.raise_error_bad_validation(logger, ('Invalid title specified for the object_type: %s.' ' Cannot be empty and cannot' ' contain = " or \'.') % self.object_type) def _validate_identifier_and_info_field_names(self, field_name): """ Validate alias and info fields Guard against usage of problematic field names: Disallow field names starting with $ since KV store lookups (MongoDB) doesnt support field names starting with a $ Disallow fields containing . since KV store lookups (MongoDB) doesnt support field names with dots Disallow " and ' which are used for escaping field names - preventing these allows searches to escape easily Disallow internal keywords to be used as alias and informational fields. A list of the internal keywords are listed in entity_internal_keywords list UI controls like MultiInputControl use comma as separator to specify multiple fields. So disallow them. Disallow = character since SPL does not seem to support fully. Eg. index=_internal | eval 'field='="value" fails on syntax @type field_name: basestring @param field_name: alias or info field name @return: None """ if not utils.is_valid_str(field_name): self.raise_error_bad_validation( logger, 'Field name is required.' ) if re.search(self.regex_invalid_chars, field_name): self.raise_error_bad_validation( logger, ('Invalid field name specified: {}. ' 'Fields cannot contain special characters not supported by SPL.').format(field_name) ) if field_name in self._entity_internal_keywords: self.raise_error_bad_validation( logger, 'Invalid field name specified: {0}. Field cannot be an internal keyword: {1}.' .format(field_name, str(self._entity_internal_keywords)) ) def _populate_identifier_and_info_fields_blob(self, entity): """ Always populate identifier.values and informational.values again, by going through the fields in identifier.fields and informational.fields respectively and finding their values at top-level in entity object. @type entity: dict @param entity: entity object @return: None """ field_types = ['identifier', 'informational'] for field_type in field_types: field_blob = entity.get(field_type, {}) if not isinstance(field_blob, dict): logger.warning('Incorrect format of %s field in entity object. Resetting it to empty dictionary. ' 'entity_title="%s"' % (field_type, entity.get('title'))) field_blob = {} if 'fields' not in field_blob or not isinstance(field_blob['fields'], list): field_blob['fields'] = [] field_blob['values'] = [] # Alias fields and its values are present at two places in entity object: at top level as alias field and # value, and in identifier.fields and identifier.values lists. To maintain consistency across entity # object, always clean up identifier.values and, re-populate it by going through identifier.fields and # getting values for those fields from top-level alias fields in entity object. If alias field values are # not present in identifier.values list, lookup of entity by alias field value would return null. # Perform the same handling for informational fields as well, as they follow the same structure as # identifier field. For more info, check ITSI-356. for field_name in field_blob.get('fields', []): self._validate_identifier_and_info_field_names(field_name) if field_name not in entity: self.raise_error_bad_validation( logger, ('%s field specified in %s.fields attribute, not found in entity object. ' 'entity_title="%s", %s_field="%s"') % (field_type, field_type, entity.get('title'), field_type, field_name) ) else: if not isinstance(entity.get(field_name), list): self.raise_error_bad_validation( logger, ('Incorrect format of %s field, in entity object. Expected list, found %s, ' 'entity_title="%s", %s_field="%s"') % (field_type, type(entity.get(field_name)).__name__, entity.get('title'), field_type, field_name) ) # In order to enable KV Store case insensitive matching we need to # convert identifier/info values to lower case for field_value in entity.get(field_name): if field_value.lower() not in field_blob.get('values'): field_blob['values'].append(field_value.lower()) entity[field_type] = field_blob def _populate_identifier_lookups_field(self, entity): """ Populate the entity field that is used for KV Store lookups @type entity: dict @param entity: the entity object data """ identifier = entity.get(ITOAObjConst.IDENTIFIER, {}).get('fields', []) if not isinstance(identifier, list): return identifier_lookups = [] for field in identifier: values = entity.get(field, []) # Create strings that can be used as lookups by including # both the identifier field name and value (eg, field=value) # Ensure that everything is lowercased for case-insensitivity lookups field_lookups = ['{}={}'.format(field, value).lower() for value in values] identifier_lookups.extend(field_lookups) entity[ITOAObjConst.ENTITY_IDENTIFIER_LOOKUPS] = list(set(identifier_lookups)) def _populate_informational_lookups_field(self, entity): """ Populate the entity informational field that is used for KV Store lookups @type entity: dict @param entity: the entity object data """ informational = entity.get(ITOAObjConst.INFORMATIONAL, {}).get('fields', []) if not isinstance(informational, list): return informational_lookups = [] for field in informational: values = entity.get(field, []) # Create strings that can be used as lookups by including # both the informational field name and value (eg, field=value) # Ensure that everything is lowercased for case-insensitivity lookups field_lookups = ['{}={}'.format(field, value).lower() for value in values] informational_lookups.extend(field_lookups) entity[ITOAObjConst.ENTITY_INFORMATIONAL_LOOKUPS] = list(set(informational_lookups)) def _populate_status_lookups_field(self, entity): """ Populate the entity status field that is used for KV Store lookups @type entity: dict @param entity: the entity object data """ combined_status = entity.get(ITOAObjConst.STATUS, {}).get('combined', 'N/A') breakdown = entity.get(ITOAObjConst.STATUS, {}).get('breakdown', {}) entity[ITOAObjConst.ENTITY_STATUS_LOOKUPS] = ['combined={}'.format(combined_status).lower()] for recurring_import_search_title, refresh_time in breakdown.items(): entity[ITOAObjConst.ENTITY_STATUS_LOOKUPS].append('{}={}'.format( recurring_import_search_title, refresh_time ).lower()) def _check_and_update_retirable_flag(self, owner, updated_entity, transaction_id): if updated_entity.get('retirable', 0) == 1: current_entity = self.get(owner, updated_entity.get("_key"), transaction_id=transaction_id, replace_raw_status=False) if current_entity: current_entity_type_ids = current_entity.get('entity_type_ids', []) updated_entity_type_ids = updated_entity.get('entity_type_ids', []) if (len(updated_entity_type_ids) != len(current_entity_type_ids)) \ or (not set(current_entity_type_ids) == set(updated_entity_type_ids)): updated_entity['retirable'] = 0 updated_entity['retiring_policy'] = [] def do_additional_setup(self, owner, objects, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT, transaction_id=None, skip_local_failure=False): for json_data in objects: # Assume json_data is valid self._populate_identifier_and_info_fields_blob(json_data) self._populate_identifier_lookups_field(json_data) self._populate_informational_lookups_field(json_data) self._populate_status_lookups_field(json_data) self._check_and_update_retirable_flag(owner, json_data, transaction_id) if 'itsi_event_name' not in json_data: json_data['itsi_event_name'] = [json_data.get('title')] elif 'itsi_event_name' in json_data: if json_data.get('title') not in json_data['itsi_event_name']: json_data['itsi_event_name'] = [json_data.get('title')] # validate there are no common fields between identifier and info fields identifier_fields_set = set(json_data.get('identifier', {}).get('fields', [])) info_fields_set = set(json_data.get('informational', {}).get('fields', [])) common_fields_set = identifier_fields_set.intersection(info_fields_set) if common_fields_set: self.raise_error_bad_validation( logger, ('Invalid informational field(s) specified for entity . Some of the info fields conflict with ' 'identifier fields. entity_title="%s". conflicting_info_fields="%s"') % (json_data.get('title'), list(common_fields_set)) ) def identify_dependencies(self, owner, objects, method, req_source='unknown', transaction_id=None, skip_local_failure=False): persisted_entities = [] if ((method == CRUDMethodTypes.METHOD_UPDATE) or (method == CRUDMethodTypes.METHOD_UPSERT) or (method == CRUDMethodTypes.METHOD_DELETE)): # if delete, then we know objects are real persisted objects, no need to fetch again if method == CRUDMethodTypes.METHOD_DELETE: persisted_entities = objects else: persisted_entities = self.get_persisted_objects_by_id( owner, object_ids=[entity.get('_key') for entity in objects], req_source=req_source ) entities_needing_service_refresh = set() is_services_associated_by_rules_need_refresh = False force_service_refresh = set() retired_or_deleted_entities = [] for entity in objects: # Assume entity is a valid json if method == CRUDMethodTypes.METHOD_CREATE: if not utils.is_valid_str(entity.get('_key')): # Generate a key here on create to facilitate setting up refresh objects entity['_key'] = ITOAInterfaceUtils.generate_backend_key() if method == CRUDMethodTypes.METHOD_DELETE: retired_or_deleted_entities.append(entity['_key']) old_ent = self.get(owner, entity['_key'], req_source=req_source, transaction_id=transaction_id) services = old_ent.get('services') if services is not None and len(services) > 0: for s in services: if not isinstance(s, dict): logger.error("Invalid services structure received - skipping entity=%s", old_ent) continue service_key = s.get('_key') if service_key: force_service_refresh.add(service_key) # First identify if relevant fields have changed # If entity is created or deleted, all services in the system may need to be updated # If identifiers and informational fields have changed, then all services in the system # may need to be updated # Services need to be updated since the rules in the services may need to include/exclude # the entity with new settings which affects KPI searches is_identifiers_changed = False is_entity_created_or_deleted = False entities_became_active = False entities_became_retired = False entities_type_changed = False if ((method == CRUDMethodTypes.METHOD_CREATE) or (method == CRUDMethodTypes.METHOD_DELETE)): is_entity_created_or_deleted = True elif (method == CRUDMethodTypes.METHOD_UPDATE) or (method == CRUDMethodTypes.METHOD_UPSERT): entity_identifier = entity.get('identifier', {}) entity_informational = entity.get('informational', {}) entity_type = entity.get('entity_type_ids', []) is_retired = utils.normalize_bool_flag(entity.get('retired', 0)) # We do not need to address situations when the only change was entity became "retirable" entity_found = False for persisted_entity in persisted_entities: if entity['_key'] == persisted_entity['_key']: entity_found = True # First identify if identifiers and informational fields have changed persisted_entity_type = persisted_entity.get('entity_type_ids', []) persisted_identifier = persisted_entity.get('identifier', {}) persisted_informational = persisted_entity.get('informational', {}) persisted_retired = utils.normalize_bool_flag(persisted_entity.get('retired', 0)) if len(entity_identifier) != len(persisted_identifier): is_identifiers_changed = True else: if not (utils.is_equal_lists( entity_identifier.get('values', []), persisted_identifier.get('values', []) )): is_identifiers_changed = True if not (utils.is_equal_lists( entity_identifier.get('fields', []), persisted_identifier.get('fields', []) )): is_identifiers_changed = True if not (utils.is_equal_lists( entity_informational.get('values', []), persisted_informational.get('values', []) )): is_identifiers_changed = True if not (utils.is_equal_lists( entity_informational.get('fields', []), persisted_informational.get('fields', []) )): is_identifiers_changed = True if not (utils.is_equal_lists( entity_type, persisted_entity_type )): is_identifiers_changed = True if is_retired != persisted_retired: # User retired the entity if is_retired: logger.debug('Entity with id {} is being retired. Adding to refresh job.' .format(persisted_entity['_key'])) entities_became_retired = True retired_or_deleted_entities.append(entity['_key']) services = persisted_entity.get('services') if services is not None and len(services) > 0: for s in services: if not isinstance(s, dict): logger.error('Invalid service - skipping entity=%s', persisted_entity) continue service_key = s.get('_key') if service_key: force_service_refresh.add(service_key) # User performed un-retire action else: # We are treating unretirement as if a new entity was created entities_became_active = True logger.debug('Entity with id {} is being unretired. Adding to refresh job to' ' regenerate service/entity associations.' .format(persisted_entity['_key']) ) break # If the entity was not found, then it must have been created by either the UPDATE or UPSERT operation is_entity_created_or_deleted = (not entity_found) else: raise AttributeError('Invalid method name {0} received'.format(method)) if is_entity_created_or_deleted: # If entity is created or deleted, related services need updates is_identifiers_changed = True # Based on which relevant fields have changed, identify dependencies to update # For 4.11.x onwards, we also determine which entities need to be updated based on if they were retired if is_identifiers_changed or entities_became_retired or entities_became_active: if entities_became_active or entities_became_retired: logger.debug('Entity lifecycle management operation occurred.' ' Refresh job created to update to update entities.') # Add a refresh request to update the services for entity membership if '_key' in entity: entities_needing_service_refresh.add(entity['_key']) # Services will need to be updated if entity was created/deleted OR if entities became active/retired is_services_associated_by_rules_need_refresh = (is_identifiers_changed or entities_became_active or entities_became_retired or entities_type_changed) is_retiring_entities = entities_became_retired is_unretiring_entities = entities_became_active required_refresh_jobs = [] if len(entities_needing_service_refresh) > 0: required_refresh_jobs.extend([ self.get_refresh_job_meta_data( 'entity_services_update', list(entities_needing_service_refresh), self.object_type, change_detail={ 'is_services_associated_by_rules_need_refresh': is_services_associated_by_rules_need_refresh, 'deleted_entity_services': list(force_service_refresh), 'is_retiring_entities': is_retiring_entities, 'is_unretiring_entities': is_unretiring_entities }, transaction_id=transaction_id) ]) if retired_or_deleted_entities and utils.is_feature_enabled('itsi-high-scale-at', self.session_key) and utils.is_feature_enabled('itsi-entity-level-adaptive-thresholding', self.session_key): required_refresh_jobs.append(self.get_refresh_job_meta_data( 'kpi_entity_thresholds_deletion', retired_or_deleted_entities, self.object_type, transaction_id=transaction_id )) return len(required_refresh_jobs) > 0, required_refresh_jobs def get_entity_types_for_entity(self, entity): """ Get all the entity_type associated with this entity :param entity: dict :return: list of entity_types """ entity_type_ids = entity.get('entity_type_ids', []) if len(entity_type_ids) == 0: return [] entity_type_ids_filter = { '$or': [{'_key': entity_type_id} for entity_type_id in entity_type_ids] } return ItsiEntityType( self.session_key, self.current_user_name ).get_bulk( self.current_user_name, filter_data=entity_type_ids_filter ) def get_entity_data_drilldown_filter_for_entity(self, entity_id): """ Get entity data drilldown filter for entity specified with entity_id based on the data drilldowns associated with its entity types @type entity_id: str @param entity_id: _key of the entity @rtype: list @return: list of entity data drilldown filters """ entity = self.get(self.current_user_name, entity_id) if entity is None: self.raise_error(logger, 'Entity with id "%s" not found' % entity_id, status_code=http.client.NOT_FOUND) entity_types = self.get_entity_types_for_entity(entity) if len(entity_types) == 0: return [] data_drilldown_filters = [] for entity_type in entity_types: for data_drilldown in entity_type.get('data_drilldowns', []): drilldown_obj = EntityDataDrilldown(**data_drilldown) data_filter = drilldown_obj.build_data_drilldown_filter(entity) data_drilldown_filters.append(data_filter) return data_drilldown_filters def get_entity_dashboard_drilldown_url_params_for_entity(self, entity_id): """ Get entity dashboard drilldown for entity specified with entity_id based on the dashboard drilldowns associated with its entity types @type entity_id: str @param entity_id: _key of the entity @rtype: list @return: list of entity dashboard drilldowns """ entity = self.get(self.current_user_name, entity_id) if entity is None: self.raise_error(logger, 'Entity with id "%s" not found' % entity_id, status_code=http.client.NOT_FOUND) entity_types = self.get_entity_types_for_entity(entity) if len(entity_types) == 0: return [] dashboard_drilldown_url_params = [] for entity_type in entity_types: for dashboard_drilldown in entity_type.get('dashboard_drilldowns', []): drilldown_obj = EntityDashboardDrilldown(self.session_key, **dashboard_drilldown) url_params = drilldown_obj.build_url_params(entity) dashboard_drilldown_url_params.append(url_params) return dashboard_drilldown_url_params def post_save_setup(self, owner, ids, objects, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT, transaction_id=None, skip_local_failure=False): """ Performs additional operations after an entity is created/updated We must invalidate the bulk import cache when an entity is created/updated via methods other than *bulk imports*. This prevents bulk import flow from incorrectly filter out entities that may have been modified in the entity store but not in the cache. We must also invalidate the cache during non-recurring bulk import and for recurring bulk import when update method is 'replace'. This is handled in itsi_bulk_import.py For more details,see https://confluence.splunk.com/display/ ITOA/Arch+Runway+-+Entity+Scalability+Improvement+for+Bulk+Import+and+Refresh+Queue+Jobs """ if req_source != 'load_csv': ItsiBulkImportEntityDimensionCacheManager(self.session_key, owner).invalidate_cache() def post_delete(self, owner, ids, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT, transaction_id=None): """ Performs additional operations after an entity is deleted Whenever an entity is deleted, we must invalidate the bulk import metadata. This is to avoid the next bulk import incorrectly filter out entities that may be present in the cache but no longer present in the entity store. By deleting the 'useCache' flag, we ensure that the next run of bulk import ignores the cache filter and imports entities """ ItsiBulkImportEntityDimensionCacheManager(self.session_key, owner).invalidate_cache() # ___ _ _ _ ___ _ _ _____ _ # | __|_ _| |_(_) |_ _ _ | _ )_ _| | |__ |_ _|_ _ __ _ __ _(_)_ _ __ _ # | _|| ' \ _| | _| || | | _ \ || | | / / | |/ _` / _` / _` | | ' \/ _` | # |___|_||_\__|_|\__|\_, | |___/\_,_|_|_\_\ |_|\__,_\__, \__, |_|_||_\__, | # |__/ |___/|___/ |___/ TAGTYPES = ['informational', 'identifier'] def _update_entities(entities, new_common_tagset, logger, tagtype='informational'): """ Analyzes the requested changes against the requested entities, and either returns updated entities or throws an exception @param entities: A list of itsi_entity objects @param new_common_tagset: An object consisting of requeted updates - fields: A list of unique fieldnames (type: string) to be added or changed - values: A list of unique values (type: string) found in the update - attributes: An object of { field: [values] } @param logger: A splunk logger instance (opaque) @param tagtype: The tagset to change (type: string, "informational" or "identifier") @return: A list of itsi_entity objects to be batch saved """ # Using set() creates an iterable of all unique fieldnames. If # the list of unique fieldnames is not the same length as the list # of submitted fieldnames, then there must have been a duplicate # fieldname. keys_to_update = set(new_common_tagset['fields']) if len(keys_to_update) != len(new_common_tagset['fields']): raise ItoaValidationError('Integrity failure: Duplicate field names.', logger) if set(new_common_tagset['attributes'].keys()) != set(keys_to_update): raise ItoaValidationError("Data corruption: field names and attribute names do not match.", logger) if keys_to_update.intersection(ITOAObjConst.ENTITY_INTERNAL_KEYWORDS): raise ItoaValidationError('Integrity failure: Attempt to overwrite reserved word.', logger) attribute_values = set(itertools.chain.from_iterable(list(new_common_tagset['attributes'].values()))) if "" in attribute_values: raise ItoaValidationError("Data corruption: blank values are not permitted.", logger) if attribute_values != set(new_common_tagset['values']): raise ItoaValidationError('Integrity failure: Values collections are not congruent.', logger) def checkValuesAreAlsoEqual(fieldname): """ For entities in this scope, determine that they all share the same value. @param fieldname: the fieldname to compare across entities. """ try: values = iter([set([s.lower() for s in entity[fieldname]]) for entity in entities]) first = next(values) return all(first == rest for rest in values) except StopIteration: return True # The intersection of all fieldnames of a tagtype for all entities # is the set of common fieldnames. tag_fieldname_sets = [set(entity[tagtype]['fields']) for entity in entities] common_tag_nameset = set([fieldname for fieldname in tag_fieldname_sets[0].intersection(*tag_fieldname_sets) if checkValuesAreAlsoEqual(fieldname)]) # Generate a list of all fieldname sets in the INFORMATIONAL and # IDENTIFIER sets in all the entities requested. other_fieldnames = [set(entity[other_tagtype]['fields']) for entity in entities for other_tagtype in TAGTYPES] if other_fieldnames: # Union of all other_fieldname sets creates a unique set of # all fieldnames for existing entities. all_fieldnames = other_fieldnames[0].union(*other_fieldnames) # Difference with common_tag_nameset produces a set of all # fieldnames NOT in common for the existing entities. other_fieldnames = all_fieldnames.difference(common_tag_nameset) # A non-empty intersection with keys_to_update indicates a # fieldname in the list of fields the user cannot update: # either because it's an identifier, or it's not held in # common, for existing entities. other_fieldname_overlap = other_fieldnames.intersection(keys_to_update) if other_fieldname_overlap: warning = 'Integrity Failure: Attempt to overwrite restricted set key. Overlap: {}' raise ItoaValidationError(warning.format(other_fieldname_overlap), logger) fields_to_delete = common_tag_nameset - keys_to_update fields_to_add = keys_to_update - common_tag_nameset fields_to_change = common_tag_nameset - fields_to_delete for entity in entities: for field in fields_to_delete: entity.pop(field) pos = entity[tagtype]['fields'].index(field) del entity[tagtype]['fields'][pos] for field in fields_to_change: entity[field] = new_common_tagset['attributes'][field] for field in fields_to_add: entity[field] = new_common_tagset['attributes'][field] entity[tagtype]['fields'].append(field) for k in ['values', 'fields']: entity[tagtype][k] = list(set(new_common_tagset[k] + entity[tagtype][k])) return entities def bulk_entity_update_tags(session_key, current_user, owner, data, logger): """ Given a payload of entities and information fields, update the entities. @param session_key: The user's session key (opaque) @param current_user: The current user (opaque) @param owner: the owner of the object set (opaque) @param data: data sent by the client An object of: - entities: A list of entities to be updated - update: A data object representing the update delta @param logger: Our splunk logging instance. (opaque) @return: List of IDs of the itsi_entity objects saved. """ if ('entities' not in data) or ('update' not in data): warning = 'Data error. `entities` and `update` mandatory keys. Received: {}' logger.error(warning.format(data)) raise ItoaValidationError('No data received.', logger) if len(data['entities']) == 0: warning = 'At least one entity must be supplied for editing. Received: {}' logger.error(warning.format(data)) raise ItoaValidationError('No entities supplied for editing.', logger) sent_entities_lookup = dict([[entity['_key'], entity] for entity in data['entities']]) logger.warn("DEBUG -- itsi_entity.bulk_update_entities -- sent_entities_lookup={}".format(sent_entities_lookup)) filter_data = {"$or": [{'_key': key} for key in list(sent_entities_lookup.keys())]} entities_handle = ItsiEntity(session_key, current_user) matching_entities = entities_handle.get_bulk(owner, filter_data=filter_data) if len(matching_entities) != len(list(sent_entities_lookup.keys())): raise ItoaValidationError('Update race: One or more entities deleted during editing.' ' Try refreshing the page.', logger) for entity in matching_entities: if hasattr(entity, 'mod_timestamp')\ and sent_entities_lookup[entity['_key']]['mod_timestamp'] != entity['mod_timestamp']: raise ItoaValidationError('Update race: One or more entities changed during editing.' ' Try refreshing the page.', logger) updated_entities = _update_entities(matching_entities, data['update'], logger) entities_handle = ItsiEntity(session_key, current_user) return entities_handle.save_batch(owner, updated_entities, True, req_source='bulk_entity_update') def manage_entity_retirement(session_key, current_user, owner, data, logger, retire_entities=False): ''' Given a payload of entities, set the retire field to 1 for the entities. @param session_key: The user's session key (opaque) @param current_user: The current user (opaque) @param owner: the owner of the object set (opaque) @param data: data sent by the client An object of: - entities: A list of entities to be updated - update: A data object representing the update delta @param logger: Our splunk logging instance. (opaque) @return: List of IDs of the itsi_entity objects saved. ''' if len(data) == 0: warning = 'At least one entity must be supplied for retirement. Received: {}' logger.error(warning.format(data)) raise ItoaValidationError('No entities supplied for retirement.', logger) filter_data = {"$or": [{'_key': key} for key in data]} entities_handle = ItsiEntity(session_key, current_user) # We are only need the retirement related fields retirable_entities = entities_handle.get_bulk(owner, filter_data=filter_data, fields=['title', '_key', 'retired', 'retirable']) if len(retirable_entities) != len(data): raise ItoaValidationError(('One or more entities potentially deleted during retirement action. Try refreshing ' 'the page.'), logger) logger.debug('Entities with keys {} will be retired.'.format(data)) for entity in retirable_entities: # #Set the retired field to 1 and save. Identify dependencies # #will update data and send to refresh queue if retire_entities: entity['retired'] = 1 entity['retirable'] = 0 else: entity['retired'] = 0 entity['retirable'] = 0 return entities_handle.save_batch(owner, retirable_entities, True, req_source='entity_retirement', is_partial_data=True) def manage_retirable_entities(session_key, current_user, owner, logger, give_count=False): ''' Given a payload of entities, set the retire field to 1 for the entities. @param session_key: The user's session key (opaque) @param current_user: The current user (opaque) @param owner: the owner of the object set (opaque) @param logger: Our splunk logging instance. (opaque) @return: List of IDs of the itsi_entity objects saved. ''' entities_handle = ItsiEntity(session_key, current_user) filter = {"$and": [{"retired": {"$ne": 1}}, {"retirable": 1}]} retirable_entities = entities_handle.get_bulk( owner, filter_data=filter, fields=['_key', 'title', 'retirable']) logger.debug('Total number of Retirable Entities is {}' .format(len(retirable_entities))) if give_count: return retirable_entities else: if len(retirable_entities) == 0: return [] else: for entity in retirable_entities: entity['retired'] = 1 entity['retirable'] = 0 return entities_handle.save_batch(owner, retirable_entities, True, req_source='entity_retirement', is_partial_data=True) def _build_dimensions_summary(entities): dimensions_summary = dict() dedupe_cache = set() for entity in entities: field_type_fields_map = { 'alias': entity['identifier']['fields'], 'info': entity['informational']['fields'] } for field_type, fields in field_type_fields_map.items(): for key in fields: if key not in dimensions_summary: dimensions_summary[key] = [] for value in entity[key]: dedupe_cache_key = key + value + field_type if dedupe_cache_key not in dedupe_cache: dimensions_summary[key].append({'value': value, 'field_type': field_type}) dedupe_cache.add(dedupe_cache_key) return dimensions_summary def get_dimensions_summary(session_key, current_user, owner): """ Get a summary of all dimensions across all entities @param session_key: The user's session key @param current_user: The current user @param owner: the owner of the object set @return: a dict of all dimensions in the following format { : { : "info" or "alias" : [, ... ] } } """ itsi_entity = ItsiEntity(session_key, current_user) # metadata is used to pass parameters to instrumentation metadata = {} with itsi_entity._instrumentation.track('ItsiEntity.dimensionsSummary', owner=owner, metric_info=metadata): # filter out retired entities all_active_entities = itsi_entity.get_bulk(owner, filter_data={'retired': {'$ne': 1}}) dimensions_summary = _build_dimensions_summary(all_active_entities) metadata['numOfKeys'] = len(dimensions_summary.keys()) metadata['numOfValues'] = sum([len(v) for v in dimensions_summary.values()]) return dimensions_summary