# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import json from .service_collection import fetch_services from .utils import get_nested class DataSourceConverter(object): def __init__(self, session_key, services=[]): self.item = None if services: self.services = services else: # All exceptions are handled at gt_transformer level response = fetch_services(session_key) self.services = json.loads(response['content']) def convert(self, item, ds_id): """ Extracts info related to searches and produces a datasource object :param item: original widget data :param ds_id: datasource id :return: UDF datasource object """ self.item = item searchSource = get_nested(self.item, ['statModel', 'searchSource']) if searchSource != 'adhoc': # TODO: datamodel is not supported yet return None meta = None asset_id = get_nested(self.item, ['statModel', 'assetId']) ds_service_id = self.item.get('parent') if ds_service_id: # For classic GTs, service_id is stored as 'parent' attribute asset_id = self.item.get('assetId') # Rectangle/Ellipse/Icon widgets do not have an assetId in statModel, so use the one outside of statModel if asset_id: ds = self.find_KPI(asset_id) if ds: # this is a KPI ds_kpi_id = ds['kpi']['_key'] ds_service_id = ds['service']['_key'] ds_name = ds['service']['title'] + ' - ' + ds['kpi']['title'] else: # referenced KPI not found (no team permission or GT of out sync with services) # Convert old attributes to not break GT for non-restricted users # - Users can clean up on their own (for cases of out-of-sync GTs) ds_kpi_id = asset_id ds_name = ds_service_id + ' - ' + self.item['label'] # Create summary index search from the datasource's kpi_id ds_query = '`get_full_itsi_summary_kpi(' + \ ds_kpi_id + \ ')` `service_level_kpi_only` ' \ '| timechart cont=false latest(alert_value) AS alert_value, latest(alert_color) AS alert_color' meta = { 'kpiID': ds_kpi_id, 'serviceID': ds_service_id } else: # this is ad hoc ds_name = self.item['id'] ds_query = self.get_search_string() data_source = { 'name': ds_name, 'type': 'ds.search', 'options': { 'query': ds_query } } if meta: data_source['meta'] = meta return { ds_id: data_source } def find_KPI(self, id): if id and self.services: for svc in self.services: for kpi in svc.get('kpis'): if kpi.get('_key') == id: return {'service': svc, 'kpi': kpi} return None def get_search_string(self): ''' Returns the current search for this widget. ''' search = get_nested(self.item, ['statModel', 'search']) field = get_nested(self.item, ['statModel', 'thresholdLabel']) if search and field: search = '%s | rename %s AS alert_value' % (search, field) return search