You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

214 lines
7.0 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
"""
default_content_delta - this module handles the upgrading of default content through a series of deltas **for default
contents that are editable (e.g. itsi_entity_type)**. The reasoning behind having such framework plus its design
is documented at this ERD: https://confluence.splunk.com/display/ITOA/Delta-based+OOTB+content+shipping+system
As of writing (09/14/2020), this module only implements the part for handling the migration of default objects.
Handling of default object initialization for fresh ITSI installation is still handled by the data loader
process (configure_itsi.py).
"""
import os
import json
import copy
from ITOA.setup_logging import getLogger
logger = getLogger()
class Delta(object):
"""
Delta - represents a delta for a single object type between two versions. Each delta contains a
`description` and a list of `DeltaOperation`s.
"""
def __init__(self, description, ops):
"""
@param description: A human readable string describing the operations being applied
@type description: string
@param ops: A list of operations to apply
@type ops: DeltaOperation
"""
self.description = description
self.ops = ops
@classmethod
def from_file(cls, filepath):
with open(filepath, 'rb') as f:
data = f.read()
data = json.loads(data)
ops = [DeltaOperation(**op) for op in data['ops']]
delta = cls(data['description'], ops)
return delta
def apply(self, base_state):
"""
apply - Apply the delta to the base state and return a new state
@param base_state: Base state to layer the delta on top of. Key of the base state are the key of
the objects in the base state, with value being the object itself.
@type base_state: dict
@return new_state: updated state based on the delta
@rtype new_state: dict
"""
new_state = copy.copy(base_state)
for op in self.ops:
base_object = new_state.get(op.object_key)
if base_object:
new_object = op.apply(base_object)
new_state[op.object_key] = new_object
return new_state
class DeltaOperation(object):
"""
DeltaOperation - represents a single delta operation on a field of an object.
"""
ALLOWED_OPS = ['UPSERT', 'DELETE']
def __init__(self, object_type, object_key, op, field, value):
self.object_type = object_type
self.object_key = object_key
op = op.upper()
if op not in DeltaOperation.ALLOWED_OPS:
raise ValueError('Invalid op: must be one of %s' % DeltaOperation.ALLOWED_OPS)
self.op = op
self.field = field
self.value = value
def apply(self, base_object):
"""
apply - apply the delta operation on the `base_object`
@return new_object: updated object after the operation is applied
@rtype new_object: dict
"""
logger.info('Applying delta operation %r' % self)
if self.op == 'UPSERT':
new_object = self._apply_upsert(base_object)
elif self.op == 'DELETE':
new_object = self._apply_delete(base_object)
else:
new_object = base_object
return new_object
def _apply_upsert(self, base_object):
new_object = copy.copy(base_object)
new_object[self.field] = self.value
return new_object
def _apply_delete(self, base_object):
new_object = copy.copy(base_object)
if self.field in new_object:
del new_object[self.field]
return new_object
def __repr__(self):
return '%s(%s, %s, %s, %s)' % (
self.__class__.__name__,
self.object_type,
self.object_key,
self.op,
self.field
)
class ContentDeltaManager(object):
"""
ContentDeltaManager - manages reading of the delta files and applying delta of different
version to different base object
"""
DEFAULT_DELTA_DIR = os.path.join(os.path.dirname(__file__), 'default_content_delta')
def get_delta_versions(self):
"""
get_delta_versions - get all delta versions in semver format (e.g. 1.2.3)
@return versions
@rtype versions: list
"""
all_version_dirs = sorted(os.listdir(ContentDeltaManager.DEFAULT_DELTA_DIR),
key=lambda s: list(map(int, s.split('_'))))
return all_version_dirs
def get_deltas_for_version(self, version):
"""
get_deltas_for_version - get the deltas for a specific version, key-ed by the object typ
@param version
@type version: string
@return delta
@rtype delta: Delta
"""
version_dir = self.semver_to_version_dir(version)
object_type_files = os.listdir(os.path.join(ContentDeltaManager.DEFAULT_DELTA_DIR, version_dir))
delta_by_type = {}
for otf in object_type_files:
object_type = otf.split('.')[0]
delta_by_type[object_type] = self.get_delta(version, object_type)
return delta_by_type
def get_delta(self, version, object_type):
"""
get_delta - retrieves the delta for object of `object_type` at `version`
@param version: version of the delta (in semvar format)
@type version: string
@param object_type: the object for which to apply the delta to, must match its kvstore collection name
@type object_type: string
@return delta
@rtype delta: Delta
"""
version_dir = self.semver_to_version_dir(version)
delta_file = os.path.join(ContentDeltaManager.DEFAULT_DELTA_DIR, version_dir, '%s.json' % object_type)
return Delta.from_file(delta_file)
def apply_delta(self, object_type, base_state, version):
"""
apply_delta - apply delta for `object_type` at `version` to `base_state`
@param object_type: the object for which to apply the delta to, must match its kvstore collection name
@type object_type: string
@param base_state: Base state to layer the delta on top of. Key of the base state are the key of
the objects in the base state, with value being the object itself.
@type base_state: dict
@param version: version of the delta (in semvar format)
@type version: string
"""
delta = self.get_delta(version, object_type)
logger.info('Applying version %s delta on %s objects. Delta description: %s' % (
version, object_type, delta.description
))
new_state = delta.apply(base_state)
return new_state
@staticmethod
def semver_to_version_dir(version):
"""
semver_to_version_dir - convert semver version string to version directory string format
e.g. "1.2.3" -> "1_2_3"
@return version_dir
@rtype version_dir: str
"""
return version.replace('.', '_')