# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import http.client from copy import deepcopy from ITOA.itoa_object import ItoaObject from ITOA.itoa_common import get_object_batch_size from itsi.itsi_utils import ITOAInterfaceUtils from ITOA.setup_logging import logger from ITOA.controller_utils import ObjectOperation, ITOAError from itsi.duplicate_entities_manager import constants class ItsiDuplicateAliasesCache(ItoaObject): """ Implements KV store collection itsi_duplicate_aliases_cache Fields in this collection are: """ def __init__(self, session_key, current_user_name='nobody', owner='nobody', transaction_id=None): self.collection_name = 'itsi_duplicate_aliases_cache' self.session_key = session_key self.owner = owner self.transaction_id = transaction_id self.current_user_name = current_user_name super(ItsiDuplicateAliasesCache, self).__init__(self.session_key, current_user_name, self.collection_name, collection_name=self.collection_name, title_validation_required=False) def additional_setup(self, entry): """ Augment entry with more fields for contextual information before storing in the KV collection - itsi_duplicate_aliases_cache @type: object @param entry The entry to be written to the KV collection - itsi_duplicate_aliases_cache @rtype: object @return The entry augmented with additional fields for the KV collection Example: entry = { '_key': 'title=akron.usa.com', entities: ['005t-eeryh-uitit-jjjj'] } Returns : augmented_entry = { '_key': 'title=akron.usa.com', entities: ['005t-eeryh-uitit-jjjj'], '_version': 4.20.0, '_user': 'nobody' } """ augmented_entry = deepcopy(entry) augmented_entry['_user'] = 'nobody' augmented_entry['_version'] = ITOAInterfaceUtils.get_app_version(self.session_key, 'itsi', 'nobody') return augmented_entry def validate_and_massage_data(self, entry): """ Write the entry to KV collection - itsi_duplicate_aliases_cache Will raise an error if writing fails @type: object @param entry The entry to be written to the KV collection - itsi_duplicate_aliases_cache """ if not len(entry['duplicate_entities']): object_id = entry['_key'] message = 'Duplicate aliases cache was not formed correctly' logger.exception('{} for object_id: {}'.format(message, object_id)) raise ITOAError(status=http.client.BAD_REQUEST, message=message, uid='IIM-ENTTY-NMZ_017', context={'object_id': object_id}) augmented_entry = self.additional_setup(entry) return augmented_entry def write_data_to_kv(self, massaged_data_list): if len(massaged_data_list) == 0: return try: super().save_batch(owner=self.current_user_name, data_list=massaged_data_list, validate_names=True) except Exception as e: object_id = massaged_data_list message = ('Error while storing entries {} in KV Collection'.format(object_id)) logger.exception('{} for object_id: {}, {}'.format(message, object_id, e)) raise ITOAError(status=http.client.BAD_REQUEST, message=message, uid='IIM-ENTTY-NMZ_018', context={'object_id': object_id}) from e def get_bulk_from_kv(self, owner='nobody', filter_data=None, fields=None): """ Read entries from KV collection: itsi_duplicate_aliases_cache @param owner: The owner of the collection @type: basestring @param filter_data: The data that has to be filtered and fetched from the collection @type: dict """ try: filtered_data = super().get_bulk(owner=owner, filter_data=filter_data, req_source='get_bulk_itsi_duplicate_alias_cache') return filtered_data except Exception as e: object = filter_data message = ('Error while reading entries {} from KV Collection {}'.format(object, self.collection_name)) logger.exception('{} for object_id: {}, {}'.format(message, object, e)) raise ITOAError(status=http.client.BAD_REQUEST, message=message, uid='IIM-ENTTY-NMZ_019', context={'filter_data': object}) from e def retrieve_unique_entity_key_list(self, limit, skip_count, log_prefix='[ItsiDuplicateAliasesCache]'): """ Batch read and process itsi_duplicate_aliases_cache to generate unique duplicate key list :param limit: Batch read size :param skip_count: Pagination of batch :return: (unique list of duplicate entity keys, length of processed objects): (['15e4a7e5-beaf-48b9-8e1f-94824fbee6fe', '49d0301d-bec8-4977-91b9-6e21aee63653',...], 100) """ unique_entity_key_list = set() try: logger.info(f'tid={self.transaction_id}, {log_prefix} batch read from alias cache:' f' batch_size={limit}, skip_count={skip_count}') chunked_alias_objects = self.get_bulk(self.owner, fields=['_key', 'alias_type', 'duplicate_entities'], limit=limit, skip=skip_count, req_source="ItsiDuplicateAliasesCacheGet", transaction_id=self.transaction_id) logger.debug(f'tid={self.transaction_id}, {log_prefix}' f' Alias cache objects list: {[alias["_key"] for alias in chunked_alias_objects]}') if not chunked_alias_objects or len(chunked_alias_objects) == 0: logger.info(f'tid={self.transaction_id}, {log_prefix} No more alias cache objects returned.') return for alias in chunked_alias_objects: unique_entity_key_list.update(alias['duplicate_entities']) return (list(unique_entity_key_list), len(chunked_alias_objects)) except Exception as ex: logger.exception(f'tid={self.transaction_id}, {log_prefix}' f' Encountered exception while read alias cache in batch: {ex}') raise ex def remove_non_duplicate_aliases(self, log_prefix='[ItsiDuplicateAliasesCache]'): """ After processing the aliases cache, there are count=1,0 objects in the cache due to batch processing, and these cache objects needs to be removed as it's not duplicated entities. :param log_prefix: prefix message to include in the logs :return count: integer of how many non duplicate entities are removed """ try: logger.info(f'tid={self.transaction_id}, {log_prefix} Start process of removing non duplicate aliases.') removal_filter_data = { '$or': [{'count': val} for val in [0, 1]] } self.delete_bulk(self.owner, filter_data=removal_filter_data, req_source='ItsiDuplicateAliasesCacheDelete', transaction_id=self.transaction_id) except Exception as ex: logger.exception(f'tid={self.transaction_id}, {log_prefix}' f' Encountered exception when remove non duplicates from itsi_duplicate_aliases_cache: {ex}') raise ex def fetch_count_for_given_duplicate_category(self, session_key, duplicate_category_name, duplicate_type_field, log_prefix='[ItsiDuplicateAliasesCache]'): """ @param session_key: session_key used for fetching data @param duplicate_category_name: one of 'title/merge_key' OR 'alias' @param duplicate_type_field: one of 'title' or 'alias' @return: Total unique count of duplicates for a given category """ logger.debug(f'tid={self.transaction_id}, {log_prefix}' f' Fetching total count of unique entities for given category: {duplicate_category_name}') duplicate_type_filter = {"alias_type": duplicate_category_name} try: fetch_limit = get_object_batch_size(self.session_key, 'alias_cache_batch_read_size') skip = 0 unique_entity_ids = [] while True: aliases_that_match_category = super().get_bulk(owner='nobody', filter_data=duplicate_type_filter, fields=['count', '_key', 'duplicate_type'], skip=skip, limit=fetch_limit) if(len(aliases_that_match_category) == 0): break skip += fetch_limit for alias_entry in aliases_that_match_category: unique_entity_ids.extend(alias_entry['duplicate_type'][duplicate_type_field]['entities']) unique_entity_ids = list(set(unique_entity_ids)) return len(unique_entity_ids) except Exception as e: object = duplicate_type_filter message = ('Error while reading entries {} from KV Collection {}'.format(object, self.collection_name)) logger.exception('{} for object_id: {}, {}'.format(message, object, e)) raise ITOAError(status=http.client.BAD_REQUEST, message=message, uid='IIM-ENTTY-NMZ_019', context={'filter_data': object}) from e