# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. """ default_content_delta - this module handles the upgrading of default content through a series of deltas **for default contents that are editable (e.g. itsi_entity_type)**. The reasoning behind having such framework plus its design is documented at this ERD: https://confluence.splunk.com/display/ITOA/Delta-based+OOTB+content+shipping+system As of writing (09/14/2020), this module only implements the part for handling the migration of default objects. Handling of default object initialization for fresh ITSI installation is still handled by the data loader process (configure_itsi.py). """ import os import json import copy from ITOA.setup_logging import getLogger logger = getLogger() class Delta(object): """ Delta - represents a delta for a single object type between two versions. Each delta contains a `description` and a list of `DeltaOperation`s. """ def __init__(self, description, ops): """ @param description: A human readable string describing the operations being applied @type description: string @param ops: A list of operations to apply @type ops: DeltaOperation """ self.description = description self.ops = ops @classmethod def from_file(cls, filepath): with open(filepath, 'rb') as f: data = f.read() data = json.loads(data) ops = [DeltaOperation(**op) for op in data['ops']] delta = cls(data['description'], ops) return delta def apply(self, base_state): """ apply - Apply the delta to the base state and return a new state @param base_state: Base state to layer the delta on top of. Key of the base state are the key of the objects in the base state, with value being the object itself. @type base_state: dict @return new_state: updated state based on the delta @rtype new_state: dict """ new_state = copy.copy(base_state) for op in self.ops: base_object = new_state.get(op.object_key) if base_object: new_object = op.apply(base_object) new_state[op.object_key] = new_object return new_state class DeltaOperation(object): """ DeltaOperation - represents a single delta operation on a field of an object. """ ALLOWED_OPS = ['UPSERT', 'DELETE'] def __init__(self, object_type, object_key, op, field, value): self.object_type = object_type self.object_key = object_key op = op.upper() if op not in DeltaOperation.ALLOWED_OPS: raise ValueError('Invalid op: must be one of %s' % DeltaOperation.ALLOWED_OPS) self.op = op self.field = field self.value = value def apply(self, base_object): """ apply - apply the delta operation on the `base_object` @return new_object: updated object after the operation is applied @rtype new_object: dict """ logger.info('Applying delta operation %r' % self) if self.op == 'UPSERT': new_object = self._apply_upsert(base_object) elif self.op == 'DELETE': new_object = self._apply_delete(base_object) else: new_object = base_object return new_object def _apply_upsert(self, base_object): new_object = copy.copy(base_object) new_object[self.field] = self.value return new_object def _apply_delete(self, base_object): new_object = copy.copy(base_object) if self.field in new_object: del new_object[self.field] return new_object def __repr__(self): return '%s(%s, %s, %s, %s)' % ( self.__class__.__name__, self.object_type, self.object_key, self.op, self.field ) class ContentDeltaManager(object): """ ContentDeltaManager - manages reading of the delta files and applying delta of different version to different base object """ DEFAULT_DELTA_DIR = os.path.join(os.path.dirname(__file__), 'default_content_delta') def get_delta_versions(self): """ get_delta_versions - get all delta versions in semver format (e.g. 1.2.3) @return versions @rtype versions: list """ all_version_dirs = sorted(os.listdir(ContentDeltaManager.DEFAULT_DELTA_DIR), key=lambda s: list(map(int, s.split('_')))) return all_version_dirs def get_deltas_for_version(self, version): """ get_deltas_for_version - get the deltas for a specific version, key-ed by the object typ @param version @type version: string @return delta @rtype delta: Delta """ version_dir = self.semver_to_version_dir(version) object_type_files = os.listdir(os.path.join(ContentDeltaManager.DEFAULT_DELTA_DIR, version_dir)) delta_by_type = {} for otf in object_type_files: object_type = otf.split('.')[0] delta_by_type[object_type] = self.get_delta(version, object_type) return delta_by_type def get_delta(self, version, object_type): """ get_delta - retrieves the delta for object of `object_type` at `version` @param version: version of the delta (in semvar format) @type version: string @param object_type: the object for which to apply the delta to, must match its kvstore collection name @type object_type: string @return delta @rtype delta: Delta """ version_dir = self.semver_to_version_dir(version) delta_file = os.path.join(ContentDeltaManager.DEFAULT_DELTA_DIR, version_dir, '%s.json' % object_type) return Delta.from_file(delta_file) def apply_delta(self, object_type, base_state, version): """ apply_delta - apply delta for `object_type` at `version` to `base_state` @param object_type: the object for which to apply the delta to, must match its kvstore collection name @type object_type: string @param base_state: Base state to layer the delta on top of. Key of the base state are the key of the objects in the base state, with value being the object itself. @type base_state: dict @param version: version of the delta (in semvar format) @type version: string """ delta = self.get_delta(version, object_type) logger.info('Applying version %s delta on %s objects. Delta description: %s' % ( version, object_type, delta.description )) new_state = delta.apply(base_state) return new_state @staticmethod def semver_to_version_dir(version): """ semver_to_version_dir - convert semver version string to version directory string format e.g. "1.2.3" -> "1_2_3" @return version_dir @rtype version_dir: str """ return version.replace('.', '_')