You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

876 lines
34 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import ITOA.itoa_common as utils
from ITOA.setup_logging import logger
from ITOA.itoa_object import ItoaObject, CRUDMethodTypes
from ITOA.itoa_exceptions import ItoaValidationError
from ITOA.controller_utils import ITOAError
from .itsi_entity import ItsiEntity
from ITOA.setup_logging import InstrumentCall
class ItsiEntityRelationship(ItoaObject):
"""
Implements ITSI entity relationship
"""
collection_name = 'itsi_entity_relationships'
itoa_object_type = 'entity_relationship'
triple_fields = ['subject_identifier', 'object_identifier', 'predicate']
def __init__(self, session_key, current_user_name):
super(ItsiEntityRelationship, self).__init__(
session_key,
current_user_name,
self.itoa_object_type,
collection_name=self.collection_name,
title_validation_required=False
)
def ensure_required_fields(self, objects):
"""
Modify the objects passed in by reference to ensure they have the system generated required fields
Update the specific fields for create, update and batch_save
@type objects: list[dict]
@param objects: list of dict
@return: None
"""
# By default, itoa object has mod_source and mod_timestamp fields.
# But for entity relationship data type, we want to have them as a dict nested inside a list named as "mod",
# for example:
# {
# "mod": [{
# "mod_source": <string>,
# "mod_timestamp": <time>
# }]
# }
for json_data in objects:
if 'mod' not in json_data:
json_data['mod'] = []
# If there is no match mod_source field, create a new dict with mod_source and mod_timestamp,
# and append it to the list
if not any('mod_source' in d and d['mod_source'] == self.mod_method for d in json_data['mod']):
json_data['mod'].append({
'mod_source': self.mod_method,
'mod_timestamp': utils.get_current_timestamp_utc()
})
else:
# Find match mod_source field, update its corresponding mod_timestamp to latest time.
match = next((d for d in json_data['mod'] if d['mod_source'] == self.mod_method), None)
if match is not None:
match['mod_timestamp'] = utils.get_current_timestamp_utc()
json_data.pop('mod_source', None)
json_data['_version'] = self._version
def _get_triple_set_from_list(self, triple_list):
"""
Given a list of triple dict, get a set of triple dict where
duplicated triple is removed
@type triple_list: list[dict]
@param triple_list: list of triple dict
@return: set[dict]
For example, triple_list is:
[{
'subject_identifier': 'entityA - 69228',
'object_identifier': 'entityB - 64864',
'predicate': 'hosts'
},
{
'subject_identifier': 'entityA - 69228',
'object_identifier': 'entityB - 64864',
'predicate': 'hosts'
}]
return:
set([(
('subject_identifier', 'entityA - 69228'),
('object_identifier', 'entityB - 64864'),
('predicate', 'hosts')
)])
"""
return set(tuple(x.items()) for x in triple_list)
def _get_triple_list_from_set(self, triple_set):
"""
Given a set of triple tuple, get a list of triple dict.
@type triple_set: set[tuple]
@param triple_set: set of triple tuple
@return: list[dict]
For example, triple_set is:
set([(
('subject_identifier', 'entityA - 69228'),
('object_identifier', 'entityB - 64864'),
('predicate', 'hosts')
)])
return
[{
'subject_identifier': 'entityA - 69228',
'object_identifier': 'entityB - 64864',
'predicate': 'hosts'
}]
"""
return [dict(x) for x in triple_set]
def _get_triples_in_data(self, objects):
"""
Get triples in objects.
@type objects: list
@param objects: list of objects
@return: tuple(list, list, list).
The lists are triples with invalid name, triples with valid name, and triples with valid name and its key,
"""
invalid_names_triple_list = []
valid_names_triple_list = []
valid_triple_plus_key_list = []
for json_data in objects:
if any(k not in json_data for k in self.triple_fields):
self.raise_error_bad_validation(logger, 'The triple ({0}) is not completely specified in {1}'.format(
', '.join(self.triple_fields), json_data
))
triple = {k: json_data[k] for k in self.triple_fields}
for field_name, field_value in triple.items():
if not utils.is_valid_name(field_value):
invalid_names_triple_list.append(dict(triple))
break
# If all field values in a triple are valid
valid_names_triple_list.append(dict(triple))
# Need _key with a triple to build filter later
triple_plus_key = dict(triple)
triple_plus_key.update({'_key': json_data.get('_key', '')})
valid_triple_plus_key_list.append(triple_plus_key)
return invalid_names_triple_list, valid_names_triple_list, valid_triple_plus_key_list
def _build_filter_from_triple_key_list(self, valid_triple_plus_key_list):
"""
Build a list of dict as filter data, given a list of dict that contains triple and key.
@type valid_triple_plus_key_list: list[dict]
@param valid_triple_plus_key_list: a list of dict that contains triple and its key
@return: list[dict], as filter data
"""
triple_filter = []
for triple_plus_key in valid_triple_plus_key_list:
filter_dict = {k: triple_plus_key[k] for k in self.triple_fields}
filter_dict.update({'_key': {'$ne': triple_plus_key.get('_key', '')}})
triple_filter.append({'$and': [filter_dict]})
return triple_filter
def _validate_triples(self, owner, objects, transaction_id=None):
"""
Check for valid and unique triples for the objects, stored in the triple_fields.
@type owner: string
@param owner: user who is performing this operation
@type objects: list
@param objects: list of objects
@return: None, throws exceptions on validations failing
"""
# Guard against valid and duplicates within passed in objects
invalid_names_triple_list, valid_names_triple_list, valid_triple_plus_key_list = \
self._get_triples_in_data(objects)
if len(invalid_names_triple_list) > 0:
invalid_names_triple_set = self._get_triple_set_from_list(invalid_names_triple_list)
self.raise_error_bad_validation(
logger,
'Names cannot contain equal and quote characters. List of triples with invalid names: {0}'
.format(self._get_triple_list_from_set(invalid_names_triple_set))
)
del invalid_names_triple_list
if len(valid_names_triple_list) == 0:
self.raise_error_bad_validation(
logger,
'There is no triple ({0}) with valid names'.format(', '.join(self.triple_fields))
)
valid_names_triple_set = self._get_triple_set_from_list(valid_names_triple_list)
if len(valid_names_triple_set) < len(valid_names_triple_list):
self.raise_error_bad_validation(
logger,
'Triple must be unique. There are duplicate triples in {0}'.format(valid_names_triple_list)
)
del valid_names_triple_list
del valid_names_triple_set
triple_filter = self._build_filter_from_triple_key_list(valid_triple_plus_key_list)
# Now guard against duplicates against saved objects
persisted_objects = self.get_bulk(
owner,
filter_data={'$or': triple_filter},
fields=['_key'] + self.triple_fields,
transaction_id=transaction_id
)
logger.debug(
'filter_data=%s, persisted_objects=%s',
{'$or': triple_filter},
persisted_objects
)
duplicate_triple_list = []
for persisted_object in persisted_objects:
triple = {k: persisted_object[k] for k in self.triple_fields}
duplicate_triple_list.append(dict(triple))
duplicate_triple_set = self._get_triple_set_from_list(duplicate_triple_list)
del duplicate_triple_list
if len(duplicate_triple_set) > 0:
self.raise_error_bad_validation(
logger,
'New triple specified already exist. Must use new unique triple. Duplicate triple found: {0}'.format(
self._get_triple_list_from_set(duplicate_triple_set)))
del duplicate_triple_set
def do_additional_setup(self, owner, objects, req_source='unknown', method=CRUDMethodTypes.METHOD_UPSERT,
transaction_id=None, skip_local_failure=False):
"""
Any additional setup that is required to be done
before a write operation (create or update) is invoked on this object
@type owner: basestring
@param owner: request owner. "nobody" or some username.
@type objects: list
@param objects: list of objects being written
@type req_source: basestring
@param req_source: Source requesting this operation.
@type method: basestring
@param method: operation type. Defaults to upsert.
@type transaction_id: basestring
@param transaction_id: transaction id for end-end tracing.
@return: None, throws exceptions on errors
"""
self._validate_triples(owner, objects, transaction_id)
# Can add more validation later
def _create_filtered_entity(entity):
"""
Create a new entity based on given entity.
@type entity: dict
@param objects: dict of entity. An example:
{
title: "entityA",
_key: "49156d54-ea95-4d41-990b-6f62d700afd3",
identifier: {
values: ['entityA', 'b']
....
}
@rtype: dict
@return: A new dict that has _key, title, identifier.values
"""
result_entity = {'_key': entity['_key'],
'title': entity['title'],
'identifier.values': entity.get('identifier', {}).get('values', [])}
return result_entity
def _get_entity_from_identifier_key(owner, itsi_entity_obj, entity_key, entity_identifier, fields, logger):
"""
Resolve entity given entity key or identifier
@type owner: basestring
@param owner: owner making the request
@type itsi_entity_obj: object
@param itsi_entity_obj: ItsiEntity object for query KV Store
@type entity_key: basestring
@param entity_key: key for an entity
@type entity_identifier: basestring
@param entity_identifier: identifier for an entity
@type fields: list
@param fields: list of fields to retrieve from KV Store
@type logger: object
@param logger: The logger to use
@rtype: dict
@return: A entity after resolution. Raise error if more than one record is returned.
"""
# Build filter data
entity_filters = []
if entity_key:
entity_filters.append({'_key': entity_key})
if entity_identifier:
entity_filters.append({'identifier.values': entity_identifier})
filter_data = {'$or': entity_filters}
# Query KV Store for matching entities.
matching_entities = itsi_entity_obj.get_bulk(owner, filter_data=filter_data, fields=fields)
logger.debug('Query entities using filter_data=%s, fields=%s. The result is: %s',
filter_data, fields, matching_entities)
# Check matching entities returned from KV Store. Treat it as a error if more than one entity is returned
common_error_message = 'Must provide entity identifier or key in order to resolve to a unique existing entity'
if len(matching_entities) == 0:
message = 'There is no matching entity. {}'.format(common_error_message)
logger.error(message)
raise ItoaValidationError(message, logger)
elif len(matching_entities) > 1:
message = 'There are more than one matching entity. {}'.format(common_error_message)
logger.error(message)
raise ItoaValidationError(message, logger)
# Create new entity with filtered fields, for the matching entity.
match_entity = _create_filtered_entity(matching_entities[0])
logger.debug('Resolve entity_identifier=%s, entity_key=%s. The result is: %s',
entity_identifier, entity_key, match_entity)
return match_entity
def _prepare_nodes_edges_result(results, edges_set, entity_key_index):
"""
Prepare nodes and edges in right format from BFS result.
@type results: list[dict, [list[dict]], ...]
@param results: list of nodes in level order
@type edges_set: set
@param edges_set: A set of edges
@type logger: object
@param logger: The logger to use
@rtype: dict
@return: A dict that has nodes, edges
"""
def prepare_nodes_result():
current_level = 0
new_results = []
new_results_set = set()
def update_node_dict(node, level):
if node.get('_key') not in new_results_set:
node.pop('identifier.values', None)
node.update({'level': level})
new_results.append(node)
new_results_set.add(node.get('_key'))
for each_result in results:
if isinstance(each_result, dict):
update_node_dict(each_result, current_level)
current_level += 1
elif isinstance(each_result, list):
for one_item in each_result:
if isinstance(one_item, dict):
update_node_dict(one_item, current_level)
current_level += 1
return new_results
def prepare_edges_result():
new_results = [dict(x) for x in edges_set]
for each_result in new_results:
if isinstance(each_result, dict):
each_result.update({
'subject_title': entity_key_index.get(each_result.get('subject_key')).get('title'),
'object_title': entity_key_index.get(each_result.get('object_key')).get('title')
})
return new_results
final_result = {'nodes': prepare_nodes_result(), 'edges': prepare_edges_result()}
return final_result
def _get_neighbors_one_level(owner, itsi_entity_obj, itsi_entity_relationship_obj, current_entity,
entity_fields, entity_relationship_fields,
entity_key_index, entity_title_index, entity_identifier_value_index,
logger):
"""
Get neighbors for a given entity at one level distance. It is represented as a list of nodes with key only,
and a list of edges.
@type owner: basestring
@param owner: owner making the request
@type itsi_entity_obj: object
@param itsi_entity_obj: ItsiEntity object for query KV Store
@type itsi_entity_relationship_obj: object
@param itsi_entity_relationship_obj: ItsiEntityRelationship object for query KV Store
@type entity: dict
@param entity: An entity to get its neighbors at one level
@type entity_fields: list
@param entity_fields: list of entity fields to retrieve from KV Store
@type entity_relationship_fields: list
@param entity_relationship_fields: list of entity relationship fields to retrieve from KV Store
@type entity_key_index: dict
@param entity_key_index: index that has entity's key as key and entity itself as value
@type entity_title_index: dict
@param entity_title_index: index that has entity's title as key and entity's key as value
@type entity_identifier_value_index: dict
@param entity_identifier_value_index: index that has entity's identifier value as key and entity's key as value
@type logger: object
@param logger: The logger to use
@rtype: tuple(list, list)
@return: A list of nodes with key only, and a list of edges.
"""
def get_entity_relationship_for_entity():
# First build filter before query entity relationship KV Store
filters = []
for current_key, current_value in current_entity.items():
if current_key == 'identifier.values':
for v in current_value:
filters.append({'subject_identifier': v})
filters.append({'object_identifier': v})
entity_relationship_filter_data = {'$or': filters}
logger.debug('Build entity relationship filter data: entity_relationship_filter_data=%s',
entity_relationship_filter_data)
# Query entity relationship KV Store
matching_records = \
itsi_entity_relationship_obj.get_bulk(owner, filter_data=entity_relationship_filter_data,
fields=entity_relationship_fields)
logger.debug('Query entity relationship for current_entity=%s. The result is: %s',
current_entity, matching_records)
return matching_records
def get_entities_for_identifiers():
# Build a set of all identifiers from matching entity relationships records
all_identifiers_set = set()
for matching_one in matching_entity_relationships:
all_identifiers_set.add(matching_one.get('subject_identifier'))
all_identifiers_set.add(matching_one.get('object_identifier'))
# Build filter before query entity KV Store for the identifiers.
# Even if a identifier might exist in entity_title_index or entity_identifier_value_index,
# we still need query it in order to find if there is any duplicate identifier.
identifiers_to_resolve = list(all_identifiers_set)
entity_filters = []
for identifier in identifiers_to_resolve:
entity_filters.append({'identifier.values': identifier})
entity_filter_data = {'$or': entity_filters}
# Query entity KV Store for these identifiers
matching_records = itsi_entity_obj.get_bulk(owner, filter_data=entity_filter_data,
fields=entity_fields)
logger.debug('Query entity for identifiers=%s. The result is: %s',
identifiers_to_resolve, matching_records)
return matching_records
def build_indexes_for_entities():
# Build/update entity_key_index, entity_title_index, entity_identifier_value_index from matching entity
# records
duplicate_identifiers_set = set()
for matching_one in matching_entities:
entity_key = matching_one.get('_key')
entity_title = matching_one.get('title')
if entity_key not in entity_key_index:
entity_key_index.update({entity_key: _create_filtered_entity(matching_one)})
if entity_title not in entity_title_index:
entity_title_index.update({entity_title: entity_key})
identifier_values = matching_one.get('identifier', {}).get('values', [])
for identifier_value in identifier_values:
if identifier_value not in entity_identifier_value_index:
entity_identifier_value_index.update({identifier_value: entity_key})
elif identifier_value in entity_identifier_value_index and \
entity_identifier_value_index.get(identifier_value) != entity_key:
# If identifier already exists in the index but key is different, we run into duplicated identifier
# situation and will error out in the end.
existing_entity_key = entity_identifier_value_index.get(identifier_value)
duplicate_identifiers_set.add((identifier_value, entity_key))
duplicate_identifiers_set.add((identifier_value, existing_entity_key))
logger.debug('Build the following indexes: entity_key_index=%s, entity_title_index=%s,'
'entity_identifier_value_index=%s',
entity_key_index, entity_title_index, entity_identifier_value_index)
if len(duplicate_identifiers_set) > 0:
message = 'Duplicate identifiers with keys found among %s' % list(duplicate_identifiers_set)
logger.error(message)
raise ITOAError(status='500', message=message)
def dedupe_entity_relationship():
# Convert matching_entity_relationships from subject_identifier and object_identifier to the
# corresponding keys and de-dupe them.
for matching_one in matching_entity_relationships:
# Resolve subject_identifier to subject_key
subject_identifier = matching_one.get('subject_identifier')
subject_key = None
if subject_identifier in entity_title_index:
subject_key = entity_title_index.get(subject_identifier)
elif subject_identifier in entity_identifier_value_index:
subject_key = entity_identifier_value_index.get(subject_identifier)
# Resolve object_identifier to object_key
object_identifier = matching_one.get('object_identifier')
object_key = None
if object_identifier in entity_title_index:
object_key = entity_title_index.get(object_identifier)
elif subject_identifier in entity_identifier_value_index:
object_key = entity_identifier_value_index.get(object_identifier)
new_entity_relationship = {'subject_key': subject_key,
'object_key': object_key,
'predicate': matching_one.get('predicate')
}
if tuple(new_entity_relationship.items()) not in new_entity_relationship_set:
new_entity_relationship_set.add(tuple(new_entity_relationship.items()))
logger.debug('The set of entity relationships after de-dupe: %s', new_entity_relationship_set)
def normalize_entity_relationship():
# Then normalize the set of entity relationships.
# For entity relationship that has predicate as host or hostedBy, will add its pair if missing
# additional_pair_set = set()
normalize_predicates = ['hosts', 'hostedBy']
for matching_one_tuple in new_entity_relationship_set:
matching_one = dict(matching_one_tuple)
predicate = matching_one.get('predicate')
if predicate in normalize_predicates:
new_entity_relationship = {'subject_key': matching_one.get('object_key'),
'object_key': matching_one.get('subject_key')
}
new_predicate = 'hosts' if predicate == 'hostedBy' else 'hostedBy'
new_entity_relationship.update({'predicate': new_predicate})
new_tuple = tuple(new_entity_relationship.items())
if new_tuple not in new_entity_relationship_set:
additional_pair_set.add(new_tuple)
logger.debug('The set of entity relationships after normalization: %s', additional_pair_set)
def prepare_nodes_edges_from_entity_relationship():
# Create the set of nodes that only has key, and the set of entity relationships
nodes_key_set = set()
edges_set = set()
for matching_one in total_entity_relationship_set:
new_entity = dict(matching_one)
if new_entity.get('subject_key') is not None:
nodes_key_set.add(new_entity.get('subject_key'))
if new_entity.get('object_key') is not None:
nodes_key_set.add(new_entity.get('object_key'))
if new_entity.get('subject_key') is not None and new_entity.get('object_key') is not None:
edges_set.add(matching_one)
nodes_key = list(nodes_key_set)
edges = [dict(x) for x in edges_set]
logger.debug('Get nodes with key: %s. Get edges: %s', nodes_key, edges)
return nodes_key, edges
# Get all matching entity relationships that start or from any identifier value of current entity
matching_entity_relationships = get_entity_relationship_for_entity()
# For all unique identifiers in these entity relationships, get matching entities
matching_entities = get_entities_for_identifiers()
# Build indexes from matching entities
build_indexes_for_entities()
# Dedupe and normalize the matching entity relationships
new_entity_relationship_set = set()
additional_pair_set = set()
dedupe_entity_relationship()
normalize_entity_relationship()
total_entity_relationship_set = new_entity_relationship_set.union(additional_pair_set)
logger.debug('The total set of entity relationships: %s', total_entity_relationship_set)
return prepare_nodes_edges_from_entity_relationship()
def _get_neighbors_on_level_order(owner, itsi_entity_obj, itsi_entity_relationship_obj,
start_node, entity_fields, entity_relationship_fields,
level, max_count, logger):
"""
Get neighbors for a given entity by doing Breadth first search (BFS).
The result is represented as a list of nodes with key only, and a list of edges.
@type owner: basestring
@param owner: owner making the request
@type itsi_entity_obj: object
@param itsi_entity_obj: ItsiEntity object for query KV Store
@type itsi_entity_relationship_obj: object
@param itsi_entity_relationship_obj: ItsiEntityRelationship object for query KV Store
@type start_node: dict
@param start_node: An entity to get its neighbors
@type entity_fields: list
@param entity_fields: list of entity fields to retrieve from KV Store
@type entity_relationship_fields: list
@param entity_relationship_fields: list of entity relationship fields to retrieve from KV Store
@type level: Int
@param level: The distance from start_node for neighbors
@type max_count: Int
@param max_count: The max number of edges allowed
@type logger: object
@param logger: The logger to use
@rtype: dict
@return: A dict that has nodes, edges, level, max_count, complete
"""
# Dict using entity's key as key and entity itself as value
entity_key_index = {}
# Dict using entity's title as key and entity's key as value
entity_title_index = {}
# Dict using entity's identifier as key and entity's key as value
entity_identifier_value_index = {}
nodes_key_set = set()
edges_set = set()
results = []
current_level_nodes = [start_node]
visited_level = 0
is_complete_on_level = True
is_complete_on_edges = True
while current_level_nodes:
logger.debug('Start visiting current_level_nodes: current_level_nodes=%s, visited_level=%s,'
'nodes_key_set=%s, results=%s',
current_level_nodes, visited_level, nodes_key_set, results)
# Check if we are done visiting the level specified
if visited_level == level:
results.append(current_level_nodes)
is_complete_on_level = False
logger.debug('End visiting current_level_nodes earlier because visited_level==level (=%s).'
'Append current_level_nodes=%s',
visited_level, current_level_nodes)
break
level_results = []
next_level_nodes = []
for current_node in current_level_nodes:
# Visit current node
level_results.append(current_node)
# Mark it visited by adding it to nodes_key_set
nodes_key_set.add(current_node.get('_key'))
# Get its neighbors and edges one level away
neighbor_nodes_keys, neighbor_edges = \
_get_neighbors_one_level(owner, itsi_entity_obj, itsi_entity_relationship_obj,
current_node, entity_fields, entity_relationship_fields,
entity_key_index, entity_title_index, entity_identifier_value_index,
logger)
logger.debug('Get one-level neighbors for current_node=%s: '
'neighbor_nodes_keys=%s, neighbor_edges=%s',
current_node, neighbor_nodes_keys, neighbor_edges)
# For each of its neighbor nodes, if not visited, add it to next_level_nodes to visit
for neighbor_node_key in neighbor_nodes_keys:
if neighbor_node_key not in nodes_key_set and neighbor_node_key in entity_key_index:
next_level_nodes.append(entity_key_index.get(neighbor_node_key))
# For each of its neighbor edges, add it to edges_set to ensure uniqueness.
# If the number of them is over max_count, break.
for neighbor_edge in neighbor_edges:
if len(edges_set) == max_count:
logger.debug(
'End visiting current_level_nodes earlier because number of relationship '
'equal to max_count(=%s)', max_count)
is_complete_on_edges = False
break
edges_set.add(tuple(neighbor_edge.items()))
results.append(level_results)
current_level_nodes = next_level_nodes
visited_level += 1
logger.debug('End visiting current_level_nodes: next_level_nodes=%s, nodes_key_set=%s, results=%s',
next_level_nodes, nodes_key_set, results)
# Prepare final result
final_result = _prepare_nodes_edges_result(results, edges_set, entity_key_index)
msg = ''
if is_complete_on_edges and not is_complete_on_level:
msg = 'limited by level'
elif is_complete_on_level and not is_complete_on_edges:
msg = 'limited by max count'
elif not is_complete_on_level and not is_complete_on_edges:
msg = 'limited by level and max count'
final_result.update({
'start_entity': start_node,
'level': level,
'max_count': max_count,
'complete': {
'result': is_complete_on_level and is_complete_on_edges,
'reason': msg
}})
logger.debug('_get_neighbors_on_level_order returns final_result=%s', final_result)
return final_result
def get_neighbors(session_key, current_user, owner, kwargs, logger):
"""
Get related entity relationships for a given entity
@type session_key: basestring
@param session_key: The user's session key
@type current_user: basestring
@param current_user: The current user
@type owner: basestring
@param owner: owner making the request
@type kwargs: dict
@param kwargs: key word arguments extracted from request.
Required: entity_identifier or entity_key
Optional: level=1, max_count=100
@type: object
@param logger: The logger to use
@rtype: dict
@return: dict of related entity relationships. An example,
{
max_count: 100,
level: 1,
start_entity: {
title: "entityD",
_key: "49156d54-ea95-4d41-990b-6f62d700afd3",
level: 0
},
complete: {
result: false,
reason: "limit by level"
},
nodes: [
{
title: "entityD",
_key: "49156d54-ea95-4d41-990b-6f62d700afd3",
level: 0
},
{
title: "entityC",
_key: "81fd2d04-1d4a-46ac-b631-263a471c7d44",
level: 1
}
],
edges: [
{
object_key: "49156d54-ea95-4d41-990b-6f62d700afd3",
subject_title: "entityC",
object_title: "entityD",
subject_key: "81fd2d04-1d4a-46ac-b631-263a471c7d44",
predicate: "hosts"
},
{
object_key: "81fd2d04-1d4a-46ac-b631-263a471c7d44",
subject_title: "entityD",
object_title: "entityC",
subject_key: "49156d54-ea95-4d41-990b-6f62d700afd3",
predicate: "hostedBy"
}
]
}
"""
logger.debug('get_neighbors is called with user=%s, owner=%s, kwargs=%s', current_user, owner, kwargs)
# Parse parameters
entity_identifier = kwargs.get('entity_identifier')
entity_key = kwargs.get('entity_key')
level = int(kwargs.get('level', 1))
max_count = int(kwargs.get('max_count', 100))
logger.debug('Parsed parameters entity_identifier=%s, entity_key=%s, level=%s, max_count=%s',
entity_identifier, entity_key, level, max_count)
# Create objects to use
_instrumentation = InstrumentCall(logger)
itsi_entity_obj = ItsiEntity(session_key, current_user)
itsi_entity_relationship_obj = ItsiEntityRelationship(session_key, current_user)
# Create KVStore fields to use
entity_fields = ['_key', 'identifier.values', 'identifier.fields', 'title']
entity_relationship_fields = ['_key', 'subject_identifier', 'object_identifier', 'predicate']
# Step 1. Resolve given entity key or entity identifier to a unique existing entity
with _instrumentation.track('itsi_entity_relationship._get_entity_from_identifier_key', owner=owner) as \
transaction_id:
start_node = \
_get_entity_from_identifier_key(owner, itsi_entity_obj, entity_key, entity_identifier, entity_fields,
logger)
# Step 2. Do Breadth first search (BFS), starting from the above resolved entity.
with _instrumentation.track('itsi_entity_relationship._get_neighbors_on_level_order',
transaction_id=transaction_id, owner=owner):
final_result = \
_get_neighbors_on_level_order(owner, itsi_entity_obj, itsi_entity_relationship_obj,
start_node, entity_fields, entity_relationship_fields,
level, max_count, logger)
return final_result