You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
205 lines
9.9 KiB
205 lines
9.9 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
import http.client
|
|
from copy import deepcopy
|
|
|
|
from ITOA.itoa_object import ItoaObject
|
|
from ITOA.itoa_common import get_object_batch_size
|
|
from itsi.itsi_utils import ITOAInterfaceUtils
|
|
from ITOA.setup_logging import logger
|
|
from ITOA.controller_utils import ObjectOperation, ITOAError
|
|
from itsi.duplicate_entities_manager import constants
|
|
|
|
|
|
class ItsiDuplicateAliasesCache(ItoaObject):
|
|
"""
|
|
Implements KV store collection itsi_duplicate_aliases_cache
|
|
|
|
Fields in this collection are:
|
|
"""
|
|
|
|
def __init__(self, session_key, current_user_name='nobody', owner='nobody', transaction_id=None):
|
|
self.collection_name = 'itsi_duplicate_aliases_cache'
|
|
self.session_key = session_key
|
|
self.owner = owner
|
|
self.transaction_id = transaction_id
|
|
self.current_user_name = current_user_name
|
|
super(ItsiDuplicateAliasesCache, self).__init__(self.session_key,
|
|
current_user_name,
|
|
self.collection_name,
|
|
collection_name=self.collection_name,
|
|
title_validation_required=False)
|
|
|
|
def additional_setup(self, entry):
|
|
"""
|
|
Augment entry with more fields for contextual information
|
|
before storing in the KV collection - itsi_duplicate_aliases_cache
|
|
@type: object
|
|
@param entry The entry to be written to the KV collection - itsi_duplicate_aliases_cache
|
|
|
|
@rtype: object
|
|
@return The entry augmented with additional fields for the KV collection
|
|
Example:
|
|
entry = {
|
|
'_key': 'title=akron.usa.com',
|
|
entities: ['005t-eeryh-uitit-jjjj']
|
|
}
|
|
Returns :
|
|
augmented_entry = {
|
|
'_key': 'title=akron.usa.com',
|
|
entities: ['005t-eeryh-uitit-jjjj'],
|
|
'_version': 4.20.0,
|
|
'_user': 'nobody'
|
|
}
|
|
|
|
"""
|
|
augmented_entry = deepcopy(entry)
|
|
augmented_entry['_user'] = 'nobody'
|
|
augmented_entry['_version'] = ITOAInterfaceUtils.get_app_version(self.session_key, 'itsi', 'nobody')
|
|
return augmented_entry
|
|
|
|
def validate_and_massage_data(self, entry):
|
|
"""
|
|
Write the entry to KV collection - itsi_duplicate_aliases_cache
|
|
Will raise an error if writing fails
|
|
|
|
@type: object
|
|
@param entry The entry to be written to the KV collection - itsi_duplicate_aliases_cache
|
|
"""
|
|
if not len(entry['duplicate_entities']):
|
|
object_id = entry['_key']
|
|
message = 'Duplicate aliases cache was not formed correctly'
|
|
logger.exception('{} for object_id: {}'.format(message, object_id))
|
|
raise ITOAError(status=http.client.BAD_REQUEST, message=message, uid='IIM-ENTTY-NMZ_017',
|
|
context={'object_id': object_id})
|
|
augmented_entry = self.additional_setup(entry)
|
|
return augmented_entry
|
|
|
|
def write_data_to_kv(self, massaged_data_list):
|
|
if len(massaged_data_list) == 0:
|
|
return
|
|
try:
|
|
super().save_batch(owner=self.current_user_name, data_list=massaged_data_list, validate_names=True)
|
|
except Exception as e:
|
|
object_id = massaged_data_list
|
|
message = ('Error while storing entries {} in KV Collection'.format(object_id))
|
|
logger.exception('{} for object_id: {}, {}'.format(message, object_id, e))
|
|
raise ITOAError(status=http.client.BAD_REQUEST, message=message, uid='IIM-ENTTY-NMZ_018',
|
|
context={'object_id': object_id}) from e
|
|
|
|
def get_bulk_from_kv(self, owner='nobody', filter_data=None, fields=None):
|
|
"""
|
|
Read entries from KV collection: itsi_duplicate_aliases_cache
|
|
@param owner: The owner of the collection
|
|
@type: basestring
|
|
@param filter_data: The data that has to be filtered and fetched from the collection
|
|
@type: dict
|
|
"""
|
|
try:
|
|
filtered_data = super().get_bulk(owner=owner, filter_data=filter_data,
|
|
req_source='get_bulk_itsi_duplicate_alias_cache')
|
|
return filtered_data
|
|
except Exception as e:
|
|
object = filter_data
|
|
message = ('Error while reading entries {} from KV Collection {}'.format(object, self.collection_name))
|
|
logger.exception('{} for object_id: {}, {}'.format(message, object, e))
|
|
raise ITOAError(status=http.client.BAD_REQUEST, message=message, uid='IIM-ENTTY-NMZ_019',
|
|
context={'filter_data': object}) from e
|
|
|
|
def retrieve_unique_entity_key_list(self, limit, skip_count, log_prefix='[ItsiDuplicateAliasesCache]'):
|
|
"""
|
|
Batch read and process itsi_duplicate_aliases_cache to generate unique duplicate key list
|
|
|
|
:param limit: Batch read size
|
|
:param skip_count: Pagination of batch
|
|
:return: (unique list of duplicate entity keys, length of processed objects):
|
|
(['15e4a7e5-beaf-48b9-8e1f-94824fbee6fe', '49d0301d-bec8-4977-91b9-6e21aee63653',...], 100)
|
|
"""
|
|
unique_entity_key_list = set()
|
|
|
|
try:
|
|
logger.info(f'tid={self.transaction_id}, {log_prefix} batch read from alias cache:'
|
|
f' batch_size={limit}, skip_count={skip_count}')
|
|
chunked_alias_objects = self.get_bulk(self.owner,
|
|
fields=['_key', 'alias_type', 'duplicate_entities'],
|
|
limit=limit,
|
|
skip=skip_count,
|
|
req_source="ItsiDuplicateAliasesCacheGet",
|
|
transaction_id=self.transaction_id)
|
|
|
|
logger.debug(f'tid={self.transaction_id}, {log_prefix}'
|
|
f' Alias cache objects list: {[alias["_key"] for alias in chunked_alias_objects]}')
|
|
|
|
if not chunked_alias_objects or len(chunked_alias_objects) == 0:
|
|
logger.info(f'tid={self.transaction_id}, {log_prefix} No more alias cache objects returned.')
|
|
return
|
|
|
|
for alias in chunked_alias_objects:
|
|
unique_entity_key_list.update(alias['duplicate_entities'])
|
|
|
|
return (list(unique_entity_key_list), len(chunked_alias_objects))
|
|
|
|
except Exception as ex:
|
|
logger.exception(f'tid={self.transaction_id}, {log_prefix}'
|
|
f' Encountered exception while read alias cache in batch: {ex}')
|
|
raise ex
|
|
|
|
def remove_non_duplicate_aliases(self, log_prefix='[ItsiDuplicateAliasesCache]'):
|
|
"""
|
|
After processing the aliases cache, there are count=1,0 objects in the cache due to batch processing,
|
|
and these cache objects needs to be removed as it's not duplicated entities.
|
|
|
|
:param log_prefix: prefix message to include in the logs
|
|
:return count: integer of how many non duplicate entities are removed
|
|
"""
|
|
try:
|
|
logger.info(f'tid={self.transaction_id}, {log_prefix} Start process of removing non duplicate aliases.')
|
|
removal_filter_data = {
|
|
'$or': [{'count': val} for val in [0, 1]]
|
|
}
|
|
self.delete_bulk(self.owner,
|
|
filter_data=removal_filter_data,
|
|
req_source='ItsiDuplicateAliasesCacheDelete',
|
|
transaction_id=self.transaction_id)
|
|
|
|
except Exception as ex:
|
|
logger.exception(f'tid={self.transaction_id}, {log_prefix}'
|
|
f' Encountered exception when remove non duplicates from itsi_duplicate_aliases_cache: {ex}')
|
|
raise ex
|
|
|
|
def fetch_count_for_given_duplicate_category(self, session_key, duplicate_category_name, duplicate_type_field, log_prefix='[ItsiDuplicateAliasesCache]'):
|
|
"""
|
|
|
|
@param session_key: session_key used for fetching data
|
|
@param duplicate_category_name: one of 'title/merge_key' OR 'alias'
|
|
@param duplicate_type_field: one of 'title' or 'alias'
|
|
@return: Total unique count of duplicates for a given category
|
|
"""
|
|
logger.debug(f'tid={self.transaction_id}, {log_prefix}'
|
|
f' Fetching total count of unique entities for given category: {duplicate_category_name}')
|
|
duplicate_type_filter = {"alias_type": duplicate_category_name}
|
|
|
|
try:
|
|
fetch_limit = get_object_batch_size(self.session_key, 'alias_cache_batch_read_size')
|
|
skip = 0
|
|
unique_entity_ids = []
|
|
|
|
while True:
|
|
aliases_that_match_category = super().get_bulk(owner='nobody',
|
|
filter_data=duplicate_type_filter,
|
|
fields=['count', '_key', 'duplicate_type'], skip=skip,
|
|
limit=fetch_limit)
|
|
if(len(aliases_that_match_category) == 0):
|
|
break
|
|
skip += fetch_limit
|
|
for alias_entry in aliases_that_match_category:
|
|
unique_entity_ids.extend(alias_entry['duplicate_type'][duplicate_type_field]['entities'])
|
|
unique_entity_ids = list(set(unique_entity_ids))
|
|
|
|
return len(unique_entity_ids)
|
|
except Exception as e:
|
|
object = duplicate_type_filter
|
|
message = ('Error while reading entries {} from KV Collection {}'.format(object, self.collection_name))
|
|
logger.exception('{} for object_id: {}, {}'.format(message, object, e))
|
|
raise ITOAError(status=http.client.BAD_REQUEST, message=message, uid='IIM-ENTTY-NMZ_019',
|
|
context={'filter_data': object}) from e
|