You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

205 lines
9.9 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import http.client
from copy import deepcopy
from ITOA.itoa_object import ItoaObject
from ITOA.itoa_common import get_object_batch_size
from itsi.itsi_utils import ITOAInterfaceUtils
from ITOA.setup_logging import logger
from ITOA.controller_utils import ObjectOperation, ITOAError
from itsi.duplicate_entities_manager import constants
class ItsiDuplicateAliasesCache(ItoaObject):
"""
Implements KV store collection itsi_duplicate_aliases_cache
Fields in this collection are:
"""
def __init__(self, session_key, current_user_name='nobody', owner='nobody', transaction_id=None):
self.collection_name = 'itsi_duplicate_aliases_cache'
self.session_key = session_key
self.owner = owner
self.transaction_id = transaction_id
self.current_user_name = current_user_name
super(ItsiDuplicateAliasesCache, self).__init__(self.session_key,
current_user_name,
self.collection_name,
collection_name=self.collection_name,
title_validation_required=False)
def additional_setup(self, entry):
"""
Augment entry with more fields for contextual information
before storing in the KV collection - itsi_duplicate_aliases_cache
@type: object
@param entry The entry to be written to the KV collection - itsi_duplicate_aliases_cache
@rtype: object
@return The entry augmented with additional fields for the KV collection
Example:
entry = {
'_key': 'title=akron.usa.com',
entities: ['005t-eeryh-uitit-jjjj']
}
Returns :
augmented_entry = {
'_key': 'title=akron.usa.com',
entities: ['005t-eeryh-uitit-jjjj'],
'_version': 4.20.0,
'_user': 'nobody'
}
"""
augmented_entry = deepcopy(entry)
augmented_entry['_user'] = 'nobody'
augmented_entry['_version'] = ITOAInterfaceUtils.get_app_version(self.session_key, 'itsi', 'nobody')
return augmented_entry
def validate_and_massage_data(self, entry):
"""
Write the entry to KV collection - itsi_duplicate_aliases_cache
Will raise an error if writing fails
@type: object
@param entry The entry to be written to the KV collection - itsi_duplicate_aliases_cache
"""
if not len(entry['duplicate_entities']):
object_id = entry['_key']
message = 'Duplicate aliases cache was not formed correctly'
logger.exception('{} for object_id: {}'.format(message, object_id))
raise ITOAError(status=http.client.BAD_REQUEST, message=message, uid='IIM-ENTTY-NMZ_017',
context={'object_id': object_id})
augmented_entry = self.additional_setup(entry)
return augmented_entry
def write_data_to_kv(self, massaged_data_list):
if len(massaged_data_list) == 0:
return
try:
super().save_batch(owner=self.current_user_name, data_list=massaged_data_list, validate_names=True)
except Exception as e:
object_id = massaged_data_list
message = ('Error while storing entries {} in KV Collection'.format(object_id))
logger.exception('{} for object_id: {}, {}'.format(message, object_id, e))
raise ITOAError(status=http.client.BAD_REQUEST, message=message, uid='IIM-ENTTY-NMZ_018',
context={'object_id': object_id}) from e
def get_bulk_from_kv(self, owner='nobody', filter_data=None, fields=None):
"""
Read entries from KV collection: itsi_duplicate_aliases_cache
@param owner: The owner of the collection
@type: basestring
@param filter_data: The data that has to be filtered and fetched from the collection
@type: dict
"""
try:
filtered_data = super().get_bulk(owner=owner, filter_data=filter_data,
req_source='get_bulk_itsi_duplicate_alias_cache')
return filtered_data
except Exception as e:
object = filter_data
message = ('Error while reading entries {} from KV Collection {}'.format(object, self.collection_name))
logger.exception('{} for object_id: {}, {}'.format(message, object, e))
raise ITOAError(status=http.client.BAD_REQUEST, message=message, uid='IIM-ENTTY-NMZ_019',
context={'filter_data': object}) from e
def retrieve_unique_entity_key_list(self, limit, skip_count, log_prefix='[ItsiDuplicateAliasesCache]'):
"""
Batch read and process itsi_duplicate_aliases_cache to generate unique duplicate key list
:param limit: Batch read size
:param skip_count: Pagination of batch
:return: (unique list of duplicate entity keys, length of processed objects):
(['15e4a7e5-beaf-48b9-8e1f-94824fbee6fe', '49d0301d-bec8-4977-91b9-6e21aee63653',...], 100)
"""
unique_entity_key_list = set()
try:
logger.info(f'tid={self.transaction_id}, {log_prefix} batch read from alias cache:'
f' batch_size={limit}, skip_count={skip_count}')
chunked_alias_objects = self.get_bulk(self.owner,
fields=['_key', 'alias_type', 'duplicate_entities'],
limit=limit,
skip=skip_count,
req_source="ItsiDuplicateAliasesCacheGet",
transaction_id=self.transaction_id)
logger.debug(f'tid={self.transaction_id}, {log_prefix}'
f' Alias cache objects list: {[alias["_key"] for alias in chunked_alias_objects]}')
if not chunked_alias_objects or len(chunked_alias_objects) == 0:
logger.info(f'tid={self.transaction_id}, {log_prefix} No more alias cache objects returned.')
return
for alias in chunked_alias_objects:
unique_entity_key_list.update(alias['duplicate_entities'])
return (list(unique_entity_key_list), len(chunked_alias_objects))
except Exception as ex:
logger.exception(f'tid={self.transaction_id}, {log_prefix}'
f' Encountered exception while read alias cache in batch: {ex}')
raise ex
def remove_non_duplicate_aliases(self, log_prefix='[ItsiDuplicateAliasesCache]'):
"""
After processing the aliases cache, there are count=1,0 objects in the cache due to batch processing,
and these cache objects needs to be removed as it's not duplicated entities.
:param log_prefix: prefix message to include in the logs
:return count: integer of how many non duplicate entities are removed
"""
try:
logger.info(f'tid={self.transaction_id}, {log_prefix} Start process of removing non duplicate aliases.')
removal_filter_data = {
'$or': [{'count': val} for val in [0, 1]]
}
self.delete_bulk(self.owner,
filter_data=removal_filter_data,
req_source='ItsiDuplicateAliasesCacheDelete',
transaction_id=self.transaction_id)
except Exception as ex:
logger.exception(f'tid={self.transaction_id}, {log_prefix}'
f' Encountered exception when remove non duplicates from itsi_duplicate_aliases_cache: {ex}')
raise ex
def fetch_count_for_given_duplicate_category(self, session_key, duplicate_category_name, duplicate_type_field, log_prefix='[ItsiDuplicateAliasesCache]'):
"""
@param session_key: session_key used for fetching data
@param duplicate_category_name: one of 'title/merge_key' OR 'alias'
@param duplicate_type_field: one of 'title' or 'alias'
@return: Total unique count of duplicates for a given category
"""
logger.debug(f'tid={self.transaction_id}, {log_prefix}'
f' Fetching total count of unique entities for given category: {duplicate_category_name}')
duplicate_type_filter = {"alias_type": duplicate_category_name}
try:
fetch_limit = get_object_batch_size(self.session_key, 'alias_cache_batch_read_size')
skip = 0
unique_entity_ids = []
while True:
aliases_that_match_category = super().get_bulk(owner='nobody',
filter_data=duplicate_type_filter,
fields=['count', '_key', 'duplicate_type'], skip=skip,
limit=fetch_limit)
if(len(aliases_that_match_category) == 0):
break
skip += fetch_limit
for alias_entry in aliases_that_match_category:
unique_entity_ids.extend(alias_entry['duplicate_type'][duplicate_type_field]['entities'])
unique_entity_ids = list(set(unique_entity_ids))
return len(unique_entity_ids)
except Exception as e:
object = duplicate_type_filter
message = ('Error while reading entries {} from KV Collection {}'.format(object, self.collection_name))
logger.exception('{} for object_id: {}, {}'.format(message, object, e))
raise ITOAError(status=http.client.BAD_REQUEST, message=message, uid='IIM-ENTTY-NMZ_019',
context={'filter_data': object}) from e