Pushed by: unknown_user
Timestamp: 2026-01-25T19:54:31.827846
masterdev
Splunk Git Pusher 3 months ago
parent 02ea0b81e7
commit 35e711853e

@ -0,0 +1,22 @@
[outputtelemetry]
param.support = <bool>
* Whether data is part of support usage
param.anonymous = <bool>
* Whether data is part of anonymous usage
param.license = <bool>
* Whether data is part of license usage
param.optinrequired = <int>
* Opt-in level required
param.component = <string>
* Component name
param.input = <string>
* input name that has the JSON payload
param.type = [aggregate|event]
* Data type

@ -0,0 +1,28 @@
# Output to Telemetry alert settings
action.outputtelemetry = [0|1]
* Enable output to telemetry action
action.outputtelemetry.param.support = <bool>
* Whether data is part of support usage
* (optional)
action.outputtelemetry.param.anonymous = <bool>
* Whether data is part of anonymous usage
* (optional)
action.outputtelemetry.param.license = <bool>
* Whether data is part of license usage
* (optional)
action.outputtelemetry.param.optinrequired = <int>
* Opt-in level required
* (required)
action.outputtelemetry.param.component = <string>
* Component name
* (required)
action.outputtelemetry.param.type = [aggregate|event]
* Data type
* (optional)

@ -0,0 +1,24 @@
# This file contains possible attributes and values for configuring global
# telemetry settings. Please note that enabling these settings would enable
# apps to collect telemetry data about app usage and other properties.
#
# There is no global, default telemetry.conf. Instead, a telemetry.conf may
# exist in each app in Splunk Enterprise.
#
# To learn more about configuration files (including precedence) please see
# the documentation located at
# http://docs.splunk.com/Documentation/Splunk/latest/Admin/Aboutconfigurationfiles
[general]
sendLicenseUsage = false
sendAnonymizedUsage = false
sendAnonymizedWebAnalytics = false
precheckSendAnonymizedUsage = false
precheckSendLicenseUsage = true
showOptInModal = true
deprecatedConfig = false
scheduledHour = 16
reportStartDate = 2017-10-27
scheduledDay = 4
bufferFlushTimeout = 600
onCloudInstance = false

@ -0,0 +1,128 @@
# This file contains possible attributes and values for configuring global
# telemetry settings. Please note that enabling these settings would enable
# apps to collect telemetry data about app usage and other properties.
#
# There is no global, default telemetry.conf. Instead, a telemetry.conf may
# exist in each app in Splunk Enterprise.
#
# To learn more about configuration files (including precedence) please see
# the documentation located at
# http://docs.splunk.com/Documentation/Splunk/latest/Admin/Aboutconfigurationfiles
# GLOBAL SETTINGS
# Use the [default] stanza to define any global settings.
# * You can also define global settings outside of any stanza, at the top
# of the file.
# * Each conf file should have at most one default stanza. If there are
# multiple default stanzas, attributes are combined. In the case of
# multiple definitions of the same attribute, the last definition in the
# file wins.
# * If an attribute is defined at both the global level and in a specific
# stanza, the value in the specific stanza takes precedence.
[general]
optInVersion = <number>
* An integer that identifies the set of telemetry data to be collected
* Incremented upon installation if the data set collected by Splunk has changed
* This field was introduced for version 2 of the telemetry data set. So,
when this field is missing, version 1 is assumed.
* Should not be changed manually
optInVersionAcknowledged = <number>
* The latest optInVersion acknowledged by a user on this deployment
* While this value is less than the current optInVersion, a prompt for
data collection opt-in will be shown to users with the
edit_telemetry_settings capability at login
* Once a user confirms interaction with this login - regardless of
opt-in choice - this number will be set to the value of optInVersion
* This gets set regardless of whether the user opts in using the opt-in
dialog or the Settings > Instrumentation page
* If manually decreased or deleted, then a user that previously acknowledged
the opt-in dialog will not be shown the dialog the next time they log in
unless the related settings (dismissedInstrumentationOptInVersion and
hideInstrumentationOptInModal) in their user-prefs.conf are also changed.
* Unset by default
sendLicenseUsage = true|false
* Send the licensing usage information of splunk/app to the app owner
* Defaults to true
sendAnonymizedUsage = true|false
* Send the anonymized usage information about various categories like
infrastructure, utilization etc of splunk/app to Splunk, Inc
* Defaults to true
sendSupportUsage = true|false
* Send the support usage information about various categories like
infrastructure, utilization etc of splunk/app to Splunk, Inc
* Defaults to false
sendAnonymizedWebAnalytics = true|false
* Send the anonymized usage information about user interaction with
splunk performed through the web UI
* Defaults to true
precheckSendLicenseUsage = true|false
* Default value for sending license usage in opt in modal
* Defaults to true
precheckSendAnonymizedUsage = true|false
* Default value for sending anonymized usage in opt in modal
* Defaults to true
precheckSendSupportUsage = true|false
* Default value for sending support usage in opt in modal
* Defaults to true
showOptInModal = true|false
* DEPRECATED - see optInVersion and optInVersionAcknowledged settings
* Shows the opt in modal. DO NOT SET! When a user opts in, it will
automatically be set to false to not show the modal again.
* Defaults to true
deploymentID = <string>
* A uuid used to correlate telemetry data for a single splunk
deployment over time. The value is generated the first time
a user opts in to sharing telemetry data.
deprecatedConfig = true|false
* Setting to determine whether the splunk deployment is following
best practices for the platform as well as the app
* Defaults to false
retryTransaction = <string>
* Setting that is created if the telemetry conf updates cannot be delivered to
the cluster master for the splunk_instrumentation app.
* Defaults to an empty string
swaEndpoint = <string>
* The URL to which swajs will forward UI analytics events
* If blank, swajs sends events to the Splunk MINT CDS endpoint.
* Blank by default
telemetrySalt = <string>
* A salt used to hash certain fields before transmission
* Autogenerated as a random UUID when splunk starts
scheduledHour = <number>
* Time of day, on a 24 hour clock, that the scripted input responsible for collecting telemetry data starts.
* The script begins at the top of the hour and completes, including running searches on the primary instance in your deployment, after a few minutes.
* Defaults to 3
scheduledDay = <string>
* Number representing the weekday on which telemetry data collection is executed
* 0 represents Monday
* Defaults to every day (*)
reportStartDate = <string>
* Start date for the next telemetry data collection
* Uses format YYYY-MM-DD
* Defaults to empty string
bufferFlushTimeout = <number>
* Timeout for buffer flush, number in seconds
* Defaults to 600s
onCloudInstance = true|false
* Whether the instance is on cloud or on prem
* Defaults to false

@ -0,0 +1,17 @@
import logging
import traceback
import cherrypy
from splunk_instrumentation.swa_injection_tool import SwaInitScriptRenderer
from splunk.appserver.mrsparkle.lib.htmlinjectiontoolfactory import HtmlInjectionToolFactory
logger = logging.getLogger(__name__)
try:
swaInitScriptRenderer = SwaInitScriptRenderer(cherrypy)
HtmlInjectionToolFactory.singleton().register_head_injection_hook(swaInitScriptRenderer)
except Exception:
logger.error('ERROR while loading swa_injector.py: ' + traceback.format_exc())
raise

File diff suppressed because one or more lines are too long

Binary file not shown.

After

Width:  |  Height:  |  Size: 1007 B

@ -0,0 +1,41 @@
<%!
theme = ''
import splunk
from splunk.appserver.mrsparkle.lib import util
faviconFile = util.getFaviconFileName()
%>\
<!doctype html>
<!--[if lt IE 7]> <html lang="${i18n.current_lang()[0]|h}" class="no-js ie lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]> <html lang="${i18n.current_lang()[0]|h}" class="no-js ie7 lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]> <html lang="${i18n.current_lang()[0]|h}" class="no-js ie8 lt-ie9"> <![endif]-->
<!--[if IE 9]> <html lang="${i18n.current_lang()[0]|h}" class="no-js ie9"> <![endif]-->
<!--[if gt IE 9]><!--> <html lang="${i18n.current_lang()[0]|h}" class="no-js"> <!--<![endif]-->
<head>
<meta charset="utf-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<title>${_('Instrumentation | Splunk')}</title>
<meta name="description" content="listen to your data" />
<meta name="author" content="Splunk Inc." />
<meta name="viewport" content="width=1160, initial-scale=1.0" />
<link rel="shortcut icon" href="${make_url('/static/img/' + faviconFile)}" />
<link rel="apple-touch-icon-precomposed" sizes="57x57" href="${make_url('/static/img/bookmark/apple-icon-57x57-precomposed.png')}" />
<link rel="apple-touch-icon-precomposed" sizes="72x72" href="${make_url('/static/img/bookmark/apple-icon-72x72-precomposed.png')}" />
<link rel="apple-touch-icon-precomposed" sizes="114x114" href="${make_url('/static/img/bookmark/apple-icon-114x114-precomposed.png')}" />
<link rel="apple-touch-icon-precomposed" sizes="144x144" href="${make_url('/static/img/bookmark/apple-icon-144x144-precomposed.png')}" />
<meta name="msapplication-TileColor" content="#65A637">
<meta name="msapplication-TileImage" content="${make_url('/static/img/bookmark/ms-tileicon-144x144.png')}">
</head>
<body class="locale-${i18n.current_lang()[0]|h}">
<script>
document.body.append = document.body.append || document.body.appendChild;
</script>
<script src="${make_url('/config?autoload=1')}" crossorigin="use-credentials"></script>
<script src="${make_url('/static/js/i18n.js')}"></script>
<script src="${make_url('/i18ncatalog?autoload=1')}"></script>
<script>
__splunkd_partials__ = ${json_decode(splunkd)};
</script>
<script src="${make_url('/static/app/splunk_instrumentation/build/pages/inst.js')}"></script>
</body>
</html>

@ -0,0 +1,17 @@
# app/bin/
This is where you put any scripts you want to add to this app.
This is also the only directory from the app that splunk puts on the python
module lookup path, so any supporting libraries should live here as well.
instrumentation.py
------------------
this is the main entry point to run the instrumentation script. this is called from
the mod input with a token for splunkd. But you can run directly with
env INST_MODE=DEV and it will use the splunkrc.json for creds
INST_MODE=DEV python instrumentation.py

@ -0,0 +1,221 @@
# scripted inputs entry point
import os
import sys
import argparse
import datetime
import splunk_instrumentation.datetime_util as datetime_util
from time import sleep
'''
This must happen before splunk_instrumentation.constants is imported.
'''
parser = argparse.ArgumentParser()
parser.add_argument('--scheme', action='store_true')
parser.add_argument('-v', '--validate-arguments', action='store_true')
parser.add_argument('--no-collect', action='store_true', help='will not collect and index data')
parser.add_argument('--no-send', action='store_true', help='will not query _telemetry and send data')
parser.add_argument('-m', '--mode', default="INPUT", help='is required if not running from splund modular inputs')
parser.add_argument('--test-schema')
parser.add_argument('--log-level')
parser.add_argument('--username')
parser.add_argument('--password')
parser.add_argument('--execution-id')
parser.add_argument('--quickdraw-url', help='used to override the quickdraw-url')
parser.add_argument('--run-unscheduled', help='Run even if not scheduled', default=False)
parser.add_argument('--default-quickdraw', help='used to override the quickdraw-url response')
parser.add_argument('--start-date', help='first date to query, in YYYY-MM-DD format (defaults to yesterday)')
parser.add_argument('--stop-date', help='last date to query, in YYY-MM-DD format (inclusive) (defaults to yesterday)')
parser.add_argument('--batch-num', help='batch number')
args = parser.parse_args()
# configuration is done through environmental variables. Convert command line to environmental.
if args.mode:
os.environ['INST_MODE'] = args.mode
if args.no_collect:
os.environ['INST_NO_COLLECT'] = args.no_collect
if args.no_send:
os.environ['INST_NO_SEND'] = args.no_send
if args.test_schema:
os.environ['INST_TEST_SCHEMA'] = args.test_schema
if args.log_level:
os.environ['INST_DEBUG_LEVEL'] = args.log_level
if args.execution_id:
os.environ['INST_EXECUTION_ID'] = args.execution_id
if args.quickdraw_url:
os.environ['QUICKDRAW_URL'] = args.quickdraw_url
if args.default_quickdraw:
os.environ['DEFAULT_QUICKDRAW'] = args.default_quickdraw
if args.username:
os.environ['SPLUNK_USERNAME'] = args.username
if args.password:
os.environ['SPLUNK_PASSWORD'] = args.password
if args.run_unscheduled:
os.environ['RUN_UNSCHEDULE'] = args.run_unscheduled
# Routine to get the value of an input token
def get_key():
# read everything from stdin
config_str = sys.stdin.read()
# stdin is just a token
os.environ['INST_TOKEN'] = config_str.rstrip()
if not os.environ.get("SPLUNK_DB"):
os.environ['SPLUNK_DB'] = os.path.join(os.environ.get('SPLUNK_HOME') + 'var', 'lib', 'splunk')
# the default mode is INPUT and is what scripted inputs uses and implies
# there is a token passed in to stdin.
if os.environ['INST_MODE'] == "INPUT":
get_key()
# these imports inlude splunk_instrumentation.constants which need to be imported after environmental vars are set
from splunk_instrumentation.constants import SPLUNKRC, INST_PRE_EXECUTE_SLEEP, SPLUNKD_URI, BATCHES_PER_HOUR, BATCHES_MAX_SIZE # noqa: E402
from splunk_instrumentation.service_bundle import ServiceBundle # noqa: E402
from splunk_instrumentation.splunkd import Splunkd # noqa: E402
from splunk_instrumentation.input import run_input # noqa: E402
from splunk_instrumentation.report import report # noqa: E402
def normalize_date_range_params(args, report_start_date):
'''
Normalizes date range used for Data collection.
Start date for Data collection could be args.start_date, reportStartDate or yesterday
End data for Data collection could be args.stop_date or yesterday
:param args: List of arguments provided through CLI
:param report_start_date: reportStartDate specified in telemetry.conf
:return:
'''
yesterday = datetime.date.today() - datetime.timedelta(days=1)
args.start_date = datetime_util.str_to_date(args.start_date) if args.start_date\
else datetime_util.str_to_date(report_start_date) if report_start_date else yesterday
args.stop_date = datetime_util.str_to_date(args.stop_date) if args.stop_date else yesterday
def validate_date_range(args):
# SPL-153360 This can happen when the user has gone from no opt-in to some opt-in
# on the same day of the scheduled collection, before the script has run. This is
# due to the TelemetryHandler.cpp file, which detects the switch from no opt-in to
# some opt-in and sets the reportStartDate to today.
#
# When the script finally runs, it has an default stop date of yesterday, but
# reportStartDate sets the lower bound, which is today in that case. We do not
# want to generate alarming error messages, so just log the occurrence and exit
# gracefully.
if args.stop_date < args.start_date:
report.report('collection-canceled', {
'reason': 'Start date is after stop date. No data to collect.',
'start_date': args.start_date,
'stop_date': args.stop_date
})
exit(0)
def should_input_run(telemetry_conf_service, batch_num):
'''
Compares current time with the scheduledDay and scheduledHour
to determine whether Input should execute or not
:param telemetry_conf_service: Service for telemetry.conf
:return: True if current time matched scheduling in telemetry.conf
'''
scheduled_day = telemetry_conf_service.content.get('scheduledDay')
scheduled_hour = telemetry_conf_service.content.get('scheduledHour')
# Compare day and hour to time now
now = datetime.datetime.now()
# we execute all savedseaerches in batches[0, BATCHES_MAX_SIZE) in two hours; verify if current batch number should be part of
# scheduledHour [0, BATCHES_PER_HOUR) or scheduledHour + 1 [BATCHES_PER_HOUR, BATCHES_MAX_SIZE);
should_run = False
if (scheduled_day == '*' or scheduled_day == str(now.weekday())):
if batch_num is not None:
# all batches which are marked to be executed at scheduledHour will have 'execute_hour' value of 0;
# all batches which marked to be executed at (scheduledHour + 1) will have 1.
execute_hour = batch_num // BATCHES_PER_HOUR
if(scheduled_hour == str(now.hour) and execute_hour == 0):
should_run = True
elif (int(scheduled_hour) + 1 == now.hour and execute_hour == 1):
should_run = True
else:
# batch num is not provided as part of this script invocation; run all batches by default
if(scheduled_hour == str(now.hour)):
should_run = True
report.report('schedule-data', {
'schedule': {
'day': scheduled_day,
'hour': scheduled_hour
},
'now': {
'day': str(now.weekday()),
'hour': str(now.hour)
},
'batchNum': str(batch_num),
'should_run': should_run
})
return should_run
def process_input_params(telemetry_conf_service, args):
'''
Processes Input date range params and sets reportStartDate in telemetery.conf
:param telemetry_conf_service: Service for telemetry.conf
:param args: List of arguments passed to Scripted input
:return:
'''
report_start_date = telemetry_conf_service.content.get('reportStartDate')
report.report('reportStartDate', report_start_date)
normalize_date_range_params(args, report_start_date)
validate_date_range(args)
# update the 'reportStartDate' before triggering input.py ONLY if
# 1. if the batch num not provided; we execute all batches by default OR
# 2. if it is currently executing the last batch (BATCHES_MAX_SIZE - 1, since we start from 0) OR
# 3. if it is an unscheduled invocation
batch_num = get_batch_num(args)
if((batch_num is None) or (batch_num == BATCHES_MAX_SIZE - 1) or os.environ.get('RUN_UNSCHEDULE')):
reportStartDate = args.stop_date
if type(args.stop_date) == datetime.date:
reportStartDate = reportStartDate.strftime('%Y-%m-%d')
telemetry_conf_service.update({
'reportStartDate': reportStartDate
})
def get_batch_num(args):
'''
get input argument --batch-num
'''
if args.batch_num and args.batch_num != "None":
return int(args.batch_num);
return None;
# Routine to index data
def main():
if os.environ['INST_MODE'] == "DEV":
splunkd = Splunkd(**SPLUNKRC)
else:
sleep(INST_PRE_EXECUTE_SLEEP)
splunkd = Splunkd(token=os.environ['INST_TOKEN'], server_uri=SPLUNKD_URI)
services = ServiceBundle(splunkd)
telemetry_conf_service = services.telemetry_conf_service
batch_num = get_batch_num(args)
if os.environ.get('RUN_UNSCHEDULE') or should_input_run(telemetry_conf_service, batch_num):
process_input_params(telemetry_conf_service, args)
run_input({'start': args.start_date, 'stop': args.stop_date, 'batchNum': batch_num})
else:
# indicate to caller that input wasn't executed
sys.exit(114)
# Script must implement these args: scheme, validate-arguments
main()
sys.exit(0)

@ -0,0 +1,185 @@
#!/usr/bin/env python
import sys
import json
import re
from distutils.util import strtobool
from splunk_instrumentation.splunklib.searchcommands import (
StreamingCommand,
Configuration,
Option,
dispatch
)
FIELD_NAME_REGEX = "[_a-zA-Z0-9\-\. ]*"
FIELDS_REGEX = '([_a-zA-Z0-9\-\. \*]+)((?i:\((json|string|int|float|bool)\)|\[(json|string|int|float|bool)?\]))?'
ERROR_INVALID_FIELD_NAME = "Invalid field name %s. Fields must be expressed in the format '" + FIELDS_REGEX + "'."
ERROR_FIELD_PATH_CONFLICT = "Can't create field %s due to conflict."
def convert_field(val, field_type, default=None):
try:
if field_type == STRING:
return val
elif field_type == INT:
return int(val)
elif field_type == FLOAT:
return float(val)
elif field_type == BOOL:
return bool(strtobool(val))
elif field_type == JSON:
return json.loads(val)
else:
# auto detect type if none is specified, defaults to string
convert_val = convert_field(val, INT)
if convert_val is not None:
return convert_val
convert_val = convert_field(val, FLOAT)
if convert_val is not None:
return convert_val
return val
except ValueError: # return default if forced conversion fails
return default
def convert_list(vals, field_type):
res = []
for idx, val in enumerate(vals):
res.append(convert_field(val, field_type, default=val))
return res
AUTO, JSON, INT, STRING, FLOAT, BOOL = ["AUTO", "JSON", "INT", "STRING", "FLOAT", "BOOL"]
@Configuration()
class MakeJsonCommand(StreamingCommand):
_validFields = [{
"regex": re.compile(FIELD_NAME_REGEX),
"type": AUTO,
"forceArray": False
}]
output = Option(
doc="""
Name of field that contains the JSON output.
""",
require=True)
def get_field_type(self, field):
for validField in self._validFields:
if validField["regex"].match(field):
return validField["type"], validField["forceArray"]
return None, None
def get_json(self, data):
res = {}
for k, v in data.items():
field_type, force_array = self.get_field_type(k)
if field_type:
dotpath = k.split(".")
# Setup the correct amount of nested dicts based on the dot path
target = res
for segment in dotpath[:-1]:
target.setdefault(segment, {})
target = target[segment]
try:
if isinstance(v, list):
target[dotpath[-1]] = convert_list(v, field_type)
elif v is not None:
converted_val = convert_field(v, field_type, default=v)
target[dotpath[-1]] = [
converted_val] if force_array else \
converted_val
else:
target[dotpath[-1]] = None
except TypeError:
raise Exception(ERROR_FIELD_PATH_CONFLICT % k)
return res
def set_valid_fields(self):
errors = []
self._validFields = []
for jsonField in self.fieldnames:
match = re.search("^" + FIELDS_REGEX + "$", jsonField)
if match and match.group(1):
field_type = match.group(2)
if field_type:
force_array = (
field_type[:1] == "[") # force conversion to array
field_type = field_type[1:-1].upper()
if field_type == "":
field_type = AUTO
else:
force_array = False
field_type = AUTO
regex_pattern = match.group(1).replace(".", "\\.") \
.replace("*", FIELD_NAME_REGEX)
self._validFields.append({
"type": field_type,
"forceArray": force_array,
"regex": re.compile("^" + regex_pattern + "$")
})
else:
errors.append(ERROR_INVALID_FIELD_NAME % jsonField)
if len(self._validFields) == 0:
self._validFields.append({
"regex": re.compile(FIELD_NAME_REGEX),
"type": AUTO,
"forceArray": False
})
return errors
def prepare(self):
errors = self.set_valid_fields()
for error in errors:
self.write_error(error)
if len(errors) > 0:
self.error_exit(Exception("Fieldname validation failed "
"for makejson command."))
def stream(self, results):
error_counts = {}
found_results = 0
for res in results:
found_results += 1
try:
json_val = self.get_json(res)
res[self.output] = json.dumps(json_val)
except Exception as e:
if e.message in error_counts:
error_counts[e.message] += 1
else:
error_counts[e.message] = 1
res[self.output] = "{}"
yield res
# report all errors
for err in error_counts:
if error_counts[err] > 0:
self.write_error(
err + " (" + str(error_counts[err]) + " of " + str(
found_results) + " events)")
dispatch(command_class=MakeJsonCommand, argv=sys.argv, module_name= __name__)

@ -0,0 +1,195 @@
'''
Synopsis:
splunk cmd python on_splunk_start.py
Usage:
Called on splunk start. Not intended to be called manually, except for testing.
"Managed" Variable Sync Strategy:
Managed variables like the deployment ID have complex lifecycles, and
require synchronization among multiple nodes in a splunk deployment.
This leads to their abstraction behind "manager" class interfaces.
The strategy for syncing them is as follows:
* On Splunk start (when this script is triggered):
** Pull (or "sync") whatever value is at the cluster master,
overwriting any local value.
*** Since configuring clustering often requires a splunk restart,
this provides immediate sync up when clustering is enabled.
** Call the getter for the managed value.
*** The getters are designed to generate a new value
if one does not yet exist (perhaps this is a new installation,
and there was no cluster master to read from).
* On read (when the value is required to perform a task, or to create an event)
** Only call the getter.
** Typically, it is looked up in the conf file, and created anew if needed.
** Special handling may apply, see the corresponding manager class.
** If a new value is created, it *must* be passed to the telemetry endpoint
to be persisted. Only this endpoint ensures replication to the cluster master.
* On nightly scripted input
** Again, the value is pulled from the cluster master if possible
** This ensures liveness in the system, such that disagreements about
these values can eventually be resolved by conforming to the choice
of a single node. Basically, each night, the last search head to
replicate a choice to the cluster master the prior day will have won,
and all search heads will agree.
** In case there is no cluster master, it's usally a single instance,
or the existing conf file replication is relied upon for syncing
the managed values.
'''
import sys
import time
import logging
from splunk_instrumentation.splunkd import Splunkd
from splunk_instrumentation.service_bundle import ServiceBundle
from splunk_instrumentation.deployment_id_manager import DeploymentIdManager
import splunk_instrumentation.constants as constants
from splunk_instrumentation.salt_manager import SaltManager
NAME = "InstrumentationInit"
logger = logging.getLogger(NAME)
class OnSplunkStart(object):
@staticmethod
def wait_for_kv_store_started(services):
'''
Migration of the deployment ID from V1 of instrumentation
requires waiting until the KV store is ready. We'll give
it 5 minutes, then proceed without out.
'''
t_start = time.time()
status = services.server_info_service.content.get('kvStoreStatus')
while status == 'starting' and (time.time() - t_start) < (5 * 60):
time.sleep(10)
services.server_info_service.fetch()
status = services.server_info_service.content.get('kvStoreStatus')
@staticmethod
def initialize_salt(salt_manager):
'''
Create a new telemetry salt for this deployment, if needed.
'''
salt_manager.sync_with_cluster()
salt_manager.get_salt()
@staticmethod
def initialize_deployment_id(services, deployment_id_manager):
'''
Creates a new deployment ID for this deployment, if needed.
'''
deployment_id_manager.sync_deployment_id()
deployment_id = deployment_id_manager.get_deployment_id()
prefix = deployment_id_manager.get_deployment_id_prefix()
# Ensure the correct prefix is set given the current product type
if prefix and not deployment_id.startswith(prefix + '-'):
stripped_deployment_id = deployment_id
for possible_prefix in [p + '-' for p in constants.DEPLOYMENT_ID_PREFIXES]:
if deployment_id.startswith(possible_prefix):
stripped_deployment_id = deployment_id[len(possible_prefix):]
break
deployment_id_manager.set_deployment_id(
prefix + '-' + stripped_deployment_id)
@staticmethod
def opt_in_for_cloud_instrumentation(services):
'''
Configures cloud instance instrumentation settings
'''
settings = {}
current_opt_in_version = services.telemetry_conf_service.content.get('optInVersion')
swa_endpoint_url = '/splunkd/__raw/servicesNS/nobody/splunk_instrumentation/telemetry-metric'
# Being explicit about all opt-ins. Licensing's default
# is changing to True for on-prem. Need to make sure it
# remains disabled for cloud.
settings.update({
'optInVersionAcknowledged': current_opt_in_version,
'sendAnonymizedWebAnalytics': True,
'sendAnonymizedUsage': True,
'sendLicenseUsage': True,
'onCloudInstance': True,
'swaEndpoint': swa_endpoint_url
})
services.telemetry_conf_service.update(settings)
@staticmethod
def migrate_licensing_opt_in_default(services):
'''
Checks if the current opt-in values have been acknowledged
by the user. If not, they are the previous defaults, so we
can enable license data reporting as the new default.
Note that we could not simply push a new default out in the
default/telemetry.conf file, since splunkd will not write
a value to local conf files if their value is the same as
the default value. This means we would have risked changing
explicit opt-outs to opt-ins without user permissions.
'''
# Check legacy showOptInModal field to see if a user has acked
# the licensing opt-in/opt-out.
if services.telemetry_conf_service.content.get('showOptInModal') == '0':
return
# Check the new optInVersionAcknowledged flag
opt_in_version_acked = services.telemetry_conf_service.content.get('optInVersionAcknowledged')
if opt_in_version_acked is not None and int(opt_in_version_acked) != 0:
return
services.telemetry_conf_service.update({'sendLicenseUsage': True})
def main(services,
salt_manager,
deployment_id_manager,
OnSplunkStart):
OnSplunkStart.wait_for_kv_store_started(services)
OnSplunkStart.initialize_salt(salt_manager)
OnSplunkStart.initialize_deployment_id(services, deployment_id_manager)
if services.server_info_service.is_cloud():
OnSplunkStart.opt_in_for_cloud_instrumentation(services)
else:
# Cloud should never opt-in for license sharing,
# so only apply the default on-prem
OnSplunkStart.migrate_licensing_opt_in_default(services)
if __name__ == '__main__':
try:
token = sys.stdin.read().rstrip()
splunkd = Splunkd(token=token, server_uri=constants.SPLUNKD_URI)
services = ServiceBundle(splunkd)
salt_manager = SaltManager(services)
deployment_id_manager = DeploymentIdManager(
services.splunkd,
telemetry_conf_service=services.telemetry_conf_service,
server_info_service=services.server_info_service
)
main(services, salt_manager, deployment_id_manager, OnSplunkStart)
except Exception as e:
logger.error(e)
exit(0)

@ -0,0 +1,218 @@
#!/usr/bin/env python
import sys
import json
from splunk_instrumentation.splunklib import binding
from splunk_instrumentation.splunklib.searchcommands import (
ReportingCommand,
Configuration,
validators,
Option,
dispatch
)
APP_NAME = "splunk_instrumentation"
ERROR_INVALID_JSON = "Invalid JSON"
ERROR_ENDPOINT_401 = "Authentication error"
ERROR_ENDPOINT_404 = "Endpoint missing"
ERROR_ENDPOINT_405 = "Wrong method"
HTTP_ERRORS = {
401: ERROR_ENDPOINT_401,
404: ERROR_ENDPOINT_404,
405: ERROR_ENDPOINT_405
}
TELEMETRY_REQUEST_RETRY_TIMES = 5
# The command should be used with all 3 of these options or none
options_combo = [
"component",
"type",
"optinrequired"
]
visibility_options = [
"anonymous",
"license",
"support"
]
# Global validator instances
BOOLEAN_VALIDATOR = validators.Boolean()
FIELDNAME_VALIDATOR = validators.Fieldname()
STRING_VALIDATOR = validators.Match("Valid string", "^[a-zA-Z0-9._\-]+$")
TYPE_VALIDATOR = validators.Match("event or aggregate", "event|aggregate")
OPTIN_VALIDATOR = validators.Integer(1)
@Configuration()
class OutputTelemetryCommand(ReportingCommand):
input = Option(
doc='''
Name of field that contains telemetry endpoint payload.
''',
require=True, validate=FIELDNAME_VALIDATOR
)
optinrequired = Option(
doc='''
Minimum version of opt-in required by customer (e.g. 1 for Ivory, 2 for Kimono, 3 for Minty)
''',
require=False, validate=OPTIN_VALIDATOR
)
type = Option(
doc='''
Either "event" or "aggregate".
Aggregate data should be used for statistics aggregated over time,
whereas event data should be used for instantaneous data.
''',
require=False,
validate=TYPE_VALIDATOR
)
component = Option(
doc='''
A name given to the data, to describe its content.
''',
require=False
)
anonymous = Option(
doc='''
Whether or not data is categorized as Diagnostic.
''',
require=False, validate=BOOLEAN_VALIDATOR
)
license = Option(
doc='''
Whether or not data is categorized as License Usage.
''',
require=False, validate=BOOLEAN_VALIDATOR
)
support = Option(
doc='''
Whether or not data is categorized as Support Usage.
''',
require=False, validate=BOOLEAN_VALIDATOR
)
def __init__(self):
super(OutputTelemetryCommand, self).__init__()
self.visibility_options = {}
self.options_combo = []
def prepare(self):
errors = []
# Do additional arg validation if explicit args are provided
self.visibility_options = {k: self.options[k].value for k in visibility_options if k in self.options
and self.options[k].value is not None}
self.options_combo = [o for o in options_combo if o in self.options and
self.options[o].value is not None]
missing_options = [o for o in options_combo if o not in self.options_combo]
if 0 < len(missing_options) < len(options_combo):
errors.append("When specifying component, type, or"
" optinrequired, all options must be specified. Missing: " +
','.join(missing_options) + '.')
for err in errors:
self.write_error(err)
# Don't try to execute the command if there are argument errors
if len(errors) > 0:
self.error_exit(ValueError("Argument validation failed "
"for outputtelemetry command."))
def reduce(self, results):
if self.input is None:
return
error_counts = {}
found_results = 0
for result in results:
found_results += 1
error = None
event_str = None
response = None
try:
event = json.loads(result[self.input])
if "data" not in event:
event = {"data": event}
for opt in self.options_combo:
event[opt] = getattr(self, opt)
# Fix up the casing for the endpoint
if "optinrequired" in event:
event["optInRequired"] = event["optinrequired"]
del event["optinrequired"]
if len(self.visibility_options) > 0:
event["visibility"] = [k for k in self.visibility_options if self.visibility_options[k] is True]
event_str = json.dumps(event)
response = self.make_telemetry_request(event_str)
except ValueError:
error = ERROR_INVALID_JSON
except binding.HTTPError as http_error:
if http_error.status == 429:
response, error = self.retry_telemetry_request(error, event_str)
else:
error = self.format_error_message(http_error)
if error is not None:
if error in error_counts:
error_counts[error] += 1
else:
error_counts[error] = 1
yield {
"event": event_str,
"telemetry_response":
response.body.readall().decode("utf-8") if response is not None else "",
"telemetry_send_status":
error if error is not None else "submitted"
}
for err , mes in error_counts.items():
if error_counts[err] > 0:
self.write_error(err + " (" + str(error_counts[err]) +
" of " + str(found_results) + " events)")
def retry_telemetry_request(self, error, event_str):
retry_count = TELEMETRY_REQUEST_RETRY_TIMES
while retry_count > 0:
try:
return self.make_telemetry_request(event_str), None
except binding.HTTPError as http_error:
if http_error.status == 429:
retry_count -= 1
continue
else:
return None, http_error.message
return None, error
def format_error_message(self, http_error):
if http_error.status in HTTP_ERRORS:
return HTTP_ERRORS[http_error.status]
if hasattr(http_error,'message'):
return http_error.message
return str(http_error)
def make_telemetry_request(self, event_str):
return self.service.request(
"/servicesNS/" + self._metadata.searchinfo.owner + "/" +
self._metadata.searchinfo.app + "/telemetry-metric",
method="POST",
headers=[('Content-Type', 'application/json')],
body=event_str,
owner=self._metadata.searchinfo.owner,
app=self._metadata.searchinfo.app
)
dispatch(OutputTelemetryCommand, sys.argv, sys.stdin, sys.stdout, __name__)

@ -0,0 +1,41 @@
import os
import sys
import json
import time
import splunk_instrumentation.constants as constants
PATH = os.sep.join([os.environ.get('SPLUNK_HOME'), 'var', 'run', 'diags'])
def list_files():
return [file for file in os.listdir(PATH) if file.endswith('.json')]
def filter_files(jsonList):
for file in jsonList:
full_path = PATH + "/" + file
data = json.load(open(str(full_path)))
# if over 30 days, delete
if ((time.time() - data['timeCreated']) >= constants.MAX_DIAG_AGE):
delete_file(full_path)
return
def delete_file(path):
try:
os.remove(path.split('json')[0] + 'tar.gz')
os.remove(path)
finally:
return
def main():
try:
json_files = list_files()
filter_files(json_files)
finally:
return
main()
sys.exit(0)

@ -0,0 +1,31 @@
'''
DEPRECATED
This script was originally used to set the deployment ID for cloud instances.
Now, all cloud configuration is handled in splunk_instrumentation/bin/on_splunk_start.py
This file remains to support legacy cloud provisioning logic, but is now a no-op.
The previous doc string follows below.
Synopsis:
splunk cmd python set_deployment_id.py -u SPLUNK_USER -p SPLUNK_PASSWORD --prefix PREFIX
Description:
Sets the deployment ID for the instrumentation app via CLI. Intended for
deployment automation purposes.
Calls splunkd via the splunk-sdk, utilizing the same logic that generates
the deployment ID in splunkweb during instrumentation opt-in via the web UI.
Expectations:
SPLUNK_USER must have edit_telemetry_settings capability (the default for admin).
'''
from __future__ import print_function
print("set_deployment_id.py - This script is deprecated. Please remove any dependencies on it.")
exit(0)

@ -0,0 +1,38 @@
import subprocess
def get_existing_token(splunk_uri):
'''
Returns an existing token for splunk_uri, or None if there
is no active session for the given uri.
'''
token_proc = subprocess.Popen(('splunk _authtoken %s' % splunk_uri),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(token_out, token_err) = token_proc.communicate()
if token_proc.returncode != 0:
return None
else:
return token_out.strip()
def login(splunk_uri):
'''
Prompts the user to login to the given splunk_uri.
'''
subprocess.check_call(('splunk login -uri %s' % splunk_uri), shell=True)
def get_token(splunk_uri):
'''
Get a token for the given splunk_uri, prompting
the user to login if required.
'''
token = get_existing_token(splunk_uri)
if not token:
login(splunk_uri)
token = get_existing_token(splunk_uri)
return token

@ -0,0 +1,324 @@
'''
Methods for determining client (browser) eligibility.
For reference, all server roles defined by ServerRoles.cpp are:
- indexer
- universal_forwarder
- heavyweight_forwarder
- lightweight_forwarder
- license_master
- cluster_master
- cluster_slave
- cluster_search_head
- deployment_server
- deployment_client
- search_head
- search_peer
- kv_store
- management_console
- shc_captain
- shc_member
- shc_deployer
This list may change. Refer to _predefinedRole_literals in
ServerRoles.cpp to be certain.
'''
import sys
def get_eligibility(services, opt_in_version=None, username=None):
'''
Gathers eligibility data describing the instrumentation actions that
the current server supports for the given user.
IMPORTANT: Provided services should be configured with system access,
as we will be using them to read the distributed search config.
Legacy Behavior (up to and including Splunk 7.0):
This method returned a dict with 2 fields:
- is_eligible
+ Should the instrumentation UI be shown?
- reason
+ Why not?
This result was augmented in the instrumentation controller to take user
capabilities into account and returned to the caller (often the instrumentation
page, or some other page checking if it should show the opt-in modal).
However, as the UI has grown more complex - integration diag UI and its
own visibility constraints - this proved to be inflexible.
Current Behavior (as of Nightlight):
To allow the UI to handle more complicated user capability and server
configuration scenarios, we've begun to include information about all
relevant checks in separate fields. To include:
- messages
+ May include warnings/errors to be displayed on the UI
- can_server_edit_telemetry_settings
+ True for single instance and search heads
- can_server_get_diag
+ True for single instance and search heads
- is_client_agreement_current
+ Is the optInVersion given by the client up to date?
+ This is used to reject clients that have yet updated
their code to reflect the most recent data sharing
terms.
- can_user_edit_telemetry_settings
+ True if the requested user has edit_telemetry_settings
- can_user_get_diag
+ True if the requested user has get_diag
- can_show_telemetry_ui
+ True if telemetry settings related UI should be shown
+ This includes the opt-in modal and the telemetry settings & report logs
on the instrumentation page (but not diag UI).
- can_show_diag_ui
+ True if diag related UI should be shown
Note: Though some of these fields are based on the same criteria now, they
are kept separate to allow them to vary independently in the future
without requiring client code changes.
As some pages may still be compiled against outdated opt-in modals from Splunk
core, the legacy flags are still supported and retain their original meaning.
These can now be understood as:
- is_eligible
+ Should the telemetry-specific instrumentation UI be shown?
+ Equivalent to new 'can_show_telemetry_ui' field
- reason
+ Why not?
'''
result = {
'messages': [],
# Legacy fields
'is_eligible': True,
'reason': None,
# Server eligibility fields
'can_server_edit_telemetry_settings': True,
'can_server_get_diag': True,
# Client eligibility fields
'is_client_agreement_current': True,
# User eligibility fields
'can_user_edit_telemetry_settings': True,
'can_user_get_diag': True,
# Summary fields
'can_show_telemetry_ui': True,
'can_show_diag_ui': True,
}
if services.server_info_service.content.get('isFree', '0') == '1':
# Skip the following calls, as they will throw on free Splunk,
# and the checks they make are not required.
return result
unsupported_server_msg = (
'Instrumentation settings are not accessible on this server. '
'Please access the settings from a search head.'
)
if (not check_server_roles_for_eligibility(
services.server_info_service.content.get('server_roles'))):
result['can_server_edit_telemetry_settings'] = False
result['can_server_get_diag'] = False
if result['is_eligible']:
result['is_eligible'] = False
result['reason'] = unsupported_server_msg
currentOptInVersion = services.telemetry_conf_service.content.get('optInVersion')
if opt_in_version != '*' and opt_in_version != currentOptInVersion:
result['is_client_agreement_current'] = False
if result['is_eligible']:
result['is_eligible'] = False
result['reason'] = 'The client does not support the current instrumentation agreement'
# If we're not running on a free license (where there are no users),
# validate that the user has the requisite capabilities.
if services.server_info_service.content.get('isFree', '0') != '1' and username:
capabilities = (
services.splunkd.get_json(
'/services/authentication/users/%s' % (username if sys.version_info >= (3, 0) else username.encode('utf-8'))
)
['entry'][0]['content']['capabilities']
)
if ('edit_telemetry_settings' not in capabilities):
result['can_user_edit_telemetry_settings'] = False
if result['is_eligible']:
result['is_eligible'] = False
result['reason'] = 'You do not have permissions to edit telemetry settings'
if ('get_diag' not in capabilities):
result['can_user_get_diag'] = False
if not check_peers_have_telemetry_index(services):
result['messages'].append({
'text': ('Some search peers are incompatible with instrumentation. '
'To use these features, ensure that all peers are running '
'Splunk Enterprise 6.5.0 or later.'),
'type': 'warning'
})
# Only want to show the unsupported server message if
# the user would otherwise be able to access some
# functionality.
# Note: May need a separate message for each feature
# in the future, but at the moment the server
# requirements are the same for telemetry &
# diag, so we use one unified message.
if (
(result['can_user_edit_telemetry_settings'] and
not result['can_server_edit_telemetry_settings'])
or
(result['can_user_get_diag'] and
not result['can_server_get_diag'])
):
result['messages'].append({
'type': 'warning',
'text': unsupported_server_msg
})
result['can_show_telemetry_ui'] = (
result['can_user_edit_telemetry_settings'] and
result['can_server_edit_telemetry_settings'] and
result['is_client_agreement_current']
)
result['can_show_diag_ui'] = (
result['can_user_get_diag'] and
result['can_server_get_diag']
)
return result
def check_server_roles_for_eligibility(server_roles):
'''
Args:
- server_roles: A list of server roles (strings)
Returns:
True or False, indicating whether this server type is supported
'''
roles = {}
for role in server_roles:
roles[role] = True
# The whitelist determines what nodes are even considered
# for instrumentation eligibility. All nodes that contain
# any of these server roles will be considered (but may
# ultimately still be rejected based on the blacklist, etc.)
whitelist = [
# Search heads are the typical place to access the UI.
'search_head',
# Some search heads lack the search_head role and instead
# report as shc_member or shc_captain
'shc_member',
'shc_captain',
'cluster_search_head',
# Have to whitelist indexer to cover single instance deployments.
# (A single instance is not a "search head" - search heads only
# exist when paired with separate indexers).
'indexer'
]
# The blacklist immediately rejects servers that have any of
# the blacklisted roles.
blacklist = [
# The cluster master does not propagate conf settings to the search
# heads, so we blacklist it for the UI to avoid inconsistent configurations
# in the cluster.
'cluster_master',
# We've whitelisted indexers to handle the single instance case.
# However, in a distributed deployment you should only be configuring
# settings on the SH's (since they will propagate values correctly
# within the cluster), so we'll blacklist cluster_slaves to catch
# this case.
'cluster_slave',
]
special_case_rejections = [
# Any indexer that is a search peer is in a distributed environment.
# In a distributed environment, the instrumentation UI must be accessed
# by a search head. So we disable the UI if we see indexer and search_peer
# roles, without the search_head role. The explicit check for the search_head
# role was added to cover the DMC case, in which the search heads are themselves
# made search peers of the DMC node, but should still show the UI.
roles.get('indexer') and roles.get('search_peer') and not roles.get('search_head'),
# Previously we always rejected the heavywieght forwarder via the blacklist.
# SPL-151920 revealed that the heavyweight_forwarder role is added to search
# heads when configuring forwarding from the UI until the next restart. This,
# due to some incongruency in how server roles are determined at runtime vs
# startup time (runtime sets hw/f when tcpouts are enabled, startup time apparently
# only checks for the SplunkForwarder app to be enabled).
#
# For details about the heavyweight_forwarder role assignment, see discussion
# in SPL-76190, or you may have luck with the following search in splunk main:
#
# git --no-pager grep -EHni 'declareServerRole.*heavy' -- '*.h' '*.cpp'
#
# In any case, the only search head that shouldn't show the instrumentation settings
# is the cluster master, which is covered by the blacklist, so here we go...
roles.get('heavyweight_forwarder') and not (
roles.get('cluster_search_head')
or roles.get('search_head')
or roles.get('shc_captain')
or roles.get('shc_member')
)
]
if (any((roles.get(role) for role in whitelist)) and
not any((roles.get(role) for role in blacklist)) and
not any(special_case_rejections)):
return True
return False
def check_peers_have_telemetry_index(services):
splunkd = services.splunkd
dist_search_disabled = True
try:
dist_search_disabled = (
splunkd.get_json('/services/search/distributed/config')
['entry'][0]['content']['disabled']
)
except Exception:
# Proceed as if it's disabled if we can't hit the endpoint
pass
if dist_search_disabled:
return True
search_peers = splunkd.get_json('/services/search/distributed/peers')['entry']
if search_peers:
for peer in search_peers:
if (peer['content']['status'].lower() == 'up' and
not peer['content']['disabled'] and
'_telemetry' not in peer['content']['searchable_indexes']):
return False
return True
if __name__ == '__main__':
from splunk_instrumentation.splunkd import Splunkd
from splunk_instrumentation.service_bundle import ServiceBundle
from splunk_instrumentation.cli_token import get_token
bundle = ServiceBundle(Splunkd(token=get_token('https://localhost:8089')))
from pprint import pprint
pprint(get_eligibility(bundle, opt_in_version='*'))

@ -0,0 +1,114 @@
from __future__ import absolute_import
import random
import os
import sys
import json
import splunk.rest as rest
from splunk_instrumentation.datetime_util import utcNow
LAST_READ_TIME_FILE = '.last_read'
COLLECTION_NAME = "instrumentation"
INSTRUMENTATION_SOURCETYPE = 'splunk_telemetry'
INSTRUMENTATION_INDEX_NAME = '_telemetry'
INTROSPECTION_INDEX_NAME = '_introspection'
AUDIT_INDEX_NAME = '_telemetry'
AUDIT_SOURCETYPE = "splunk_telemetry_log"
INST_PRE_EXECUTE_SLEEP = 60
INST_VERSION = 1
DEPLOYMENT_ID_PREFIXES = ['CLOUD', 'CLOUDLIGHT']
# QUICKDRAW_URL is the QUICKDRAW_URL to hit
# DEFAULT_QUICKDRAW is the default url returned if we request QUICKDRAW_URL
DEFAULT_QUICKDRAW = {"url": "https://e1345286.api.splkmobile.com/1.0/e1345286"}
if os.environ.get('DEFAULT_QUICKDRAW'):
DEFAULT_QUICKDRAW = {"url": os.environ.get('DEFAULT_QUICKDRAW')}
QUICKDRAW_URL = os.environ.get('QUICKDRAW_URL') or "https://quickdraw.splunk.com/telemetry/destination"
INST_DEBUG_LEVEL = os.environ.get('INST_DEBUG_LEVEL') or "ERROR"
INST_MODE = os.environ.get('INST_MODE') or "INPUT"
INST_NO_COLLECT = os.environ.get('INST_NO_COLLECT') or False
INST_NO_SEND = os.environ.get('INST_NO_SEND') or False
INST_COLLECT_DATE = os.environ.get('INST_COLLECT_DATE') or False
INST_SCHEMA_FILE = os.environ.get('INST_SCHEMA_FILE') or os.path.dirname(os.path.realpath(__file__)) + '/schema.json'
INST_EXECUTION_ID = os.environ.get('INST_EXECUTION_ID') or "".join(random.choice('0123456789ABCDEF') for i in range(30))
INST_EXECUTION_START_TIME = utcNow()
INST_KV_OWNER = "nobody"
INST_APP_NAME = "splunk_instrumentation"
INST_PROFILE_OWNER = "nobody"
INST_PROFILE_APP = "splunk_instrumentation"
INST_LICENSE_TYPES = ['anonymous', 'license']
SPLUNKD_URI = rest.makeSplunkdUri()
SPLUNKRC = {"token": os.environ.get('INST_TOKEN') or None,
"server_uri": os.environ.get('INST_SERVER_URI') or SPLUNKD_URI
}
VISIBILITY_FIELDS_BY_NAME = {
"license": "sendLicenseUsage",
"anonymous": "sendAnonymizedUsage",
"support": "sendSupportUsage"
}
VISIBILITY_CONF_FIELDS = [
'sendLicenseUsage',
'sendAnonymizedUsage',
'sendSupportUsage'
]
CLUSTER_MASTER_REQUIRED_CONF_FIELDS = VISIBILITY_CONF_FIELDS + ['deploymentID', 'swaEndpoint',
'telemetrySalt', 'scheduledDay',
'scheduledHour', 'reportStartDate']
MAX_DIAG_AGE = 86400 * 30
ENDPOINTS = {
'APP_INFO': {
'INFO': 'telemetry/general',
'READONLY_INFO': 'configs/conf-telemetry/general',
'RETRY': 'telemetry/general/retryEdit'
},
'SERVER_INFO': 'server/info',
'KV_STORE': {
'DEPLOYMENT_ID': 'storage/collections/data/instrumentation/instrumentation_deploymentID'
},
'MASTER_SETTINGS': 'telemetry/general/masterSettings',
"DIAG_ENDPOINT": 'diag',
"DIAG_STATUS": 'diag/status'
}
DIAG_STATUS_MSG = {
'SUCCESS': 'Succeeded',
'NOT_EXIST': 'Does not exist',
'FAIL': 'Failed',
'PROGRESS': 'In progress'
}
# Adds our bin directory to the path, it should be on there anyway, but no harm...
path = os.path.realpath(os.path.dirname(os.path.realpath(__file__)) + '/../')
sys.path.append(path)
#scripted input [instrumentation.py] specific configs for executing savedsearches in batches
BATCHES_PER_HOUR = 6
BATCHES_MAX_SIZE = 12
DEFAULT_BATCH_NUM = 0 # must be between [0, BATCHES_MAX_SIZE)
# loads dev setup
if INST_MODE == "DEV":
if not os.environ.get("SPLUNK_DB"):
os.environ['SPLUNK_DB'] = os.path.join(os.environ.get('SPLUNK_HOME') + 'var', 'lib', 'splunk')
INST_PRE_EXECUTE_SLEEP = 1
if os.environ.get("SPLUNK_USERNAME"):
SPLUNKRC = {
"username": os.environ.get("SPLUNK_USERNAME"),
"password": os.environ.get("SPLUNK_PASSWORD")
}
elif not os.environ.get('INST_KEY'):
rc_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'splunkrc.json')
SPLUNKRC = json.loads(open(rc_file, 'r').read())

@ -0,0 +1,24 @@
dataPointClasses = []
def dataPointFactory(dataPointSchema, options={}):
for dataPointClass in dataPointClasses:
if dataPointClass.__name__ == dataPointSchema.type:
return dataPointClass(dataPointSchema, options)
return False
def registerDataPoint(dataPointClass):
for dataPointClassOther in dataPointClasses:
if dataPointClass.__name__ == dataPointClassOther.__name__:
return
dataPointClasses.append(dataPointClass)
class DataPoint(object):
def __init__(self, dataPointSchema, options={}):
self.dataPointSchema = dataPointSchema
self.options = options or {}
super(DataPoint, self).__init__()

@ -0,0 +1,14 @@
from __future__ import absolute_import
from splunk_instrumentation.dataPoints.data_point import DataPoint
from splunk_instrumentation.dataPoints.data_point import registerDataPoint
class MockDataPoint(DataPoint):
def __init__(self, dataPointSchema, options={}):
super(MockDataPoint, self).__init__(dataPointSchema, options)
def collect(self):
return self.dataPointSchema.dataPointSchema['results']
registerDataPoint(MockDataPoint)

@ -0,0 +1,50 @@
from __future__ import absolute_import
from splunk_instrumentation.dataPoints.data_point import DataPoint
from splunk_instrumentation.dataPoints.data_point import registerDataPoint
from splunk_instrumentation.datetime_util import localNow
from splunk_instrumentation.report import report
class ReportDataPoint(DataPoint):
def __init__(self, dataPointSchema, options={}):
super(ReportDataPoint, self).__init__(dataPointSchema, options)
def collect(self, dateRange):
'''
:param dateRange: dict("start" : date , "stop" : date)
:return:
'''
def nested_set(dic, path, value):
array_test = path.split("[")
array_test = len(array_test) == 2
keys = path.split(".")
for key in keys[:-1]:
dic = dic.setdefault(key, {})
if array_test:
dic.setdefault(keys[-1], [])
dic[keys[-1]].append(value)
else:
dic[keys[-1]] = value
mappings = self.dataPointSchema.dataPointSchema.get('mapping')
results = {}
for mapping in mappings:
path = mapping.get('path')
report_path = mapping.get('report_path')
if report_path:
data = report.get(report_path)
date_value = mapping.get('date_value')
if date_value:
data = localNow().strftime(date_value)
nested_set(results, path, data)
eventList = [{"data": results}]
return eventList
registerDataPoint(ReportDataPoint)

@ -0,0 +1,44 @@
from __future__ import absolute_import
from splunk_instrumentation.dataPoints.data_point import DataPoint
from splunk_instrumentation.dataPoints.data_point import registerDataPoint
from datetime import datetime, date, time
from splunk_instrumentation.indexing.instrumentation_index import InstrumentationIndex
from splunk_instrumentation.datetime_util import date_to_timestamp_str, local
class SPLDataPoint(DataPoint):
def __init__(self, dataPointSchema, options={}):
super(SPLDataPoint, self).__init__(dataPointSchema, options)
def collect(self, dateRange):
'''
:param dateRange: dict("start" : date , "stop" : date)
:return:
'''
saved_search = self.dataPointSchema.dataPointSchema.get('saved_search')
spl = self.dataPointSchema.dataPointSchema.get('spl')
if saved_search:
spl = ' '.join(['|', 'savedsearch', saved_search])
splunkrc = self.options.get('splunkrc')
instrumentationIndex = InstrumentationIndex(splunkrc=splunkrc)
kwargs = {
"earliest_time": dateRange['start'],
"latest_time": dateRange['stop']
}
if isinstance(kwargs['earliest_time'], date):
kwargs['earliest_time'] = date_to_timestamp_str(
datetime.combine(kwargs['earliest_time'], time.min).replace(tzinfo=local))
if isinstance(kwargs['latest_time'], date):
kwargs['latest_time'] = date_to_timestamp_str(
datetime.combine(kwargs['latest_time'], time.max).replace(tzinfo=local))
events = instrumentationIndex.query_runner.search(spl, **kwargs)
return events
registerDataPoint(SPLDataPoint)

@ -0,0 +1,147 @@
from datetime import datetime, date, tzinfo, timedelta
import time as _time
import calendar
'''
All datetimes are saved as utc time the calls is utc
ex: datetime.utcnow().replace(tzinfo=utc)
The catch is that all dates are based on local time. The reasoning is daily reports are done as a local report. But
having datetime in UTC there is no confusion between machines
'''
ZERO = timedelta(0)
HOUR = timedelta(hours=1)
# A UTC class.
class UTC(tzinfo):
"""UTC"""
def utcoffset(self, dt):
return ZERO
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return ZERO
utc = UTC()
# A class capturing the platform's idea of local time.
STDOFFSET = timedelta(seconds=-_time.timezone)
if _time.daylight:
DSTOFFSET = timedelta(seconds=-_time.altzone)
else:
DSTOFFSET = STDOFFSET
DSTDIFF = DSTOFFSET - STDOFFSET
class LocalTimezone(tzinfo):
def utcoffset(self, dt):
if self._isdst(dt):
return DSTOFFSET
else:
return STDOFFSET
def dst(self, dt):
if self._isdst(dt):
return DSTDIFF
else:
return ZERO
def tzname(self, dt):
return _time.tzname[self._isdst(dt)]
def _isdst(self, dt):
tt = (dt.year, dt.month, dt.day,
dt.hour, dt.minute, dt.second,
dt.weekday(), 0, 0)
stamp = _time.mktime(tt)
tt = _time.localtime(stamp)
return tt.tm_isdst > 0
local = LocalTimezone()
def date_to_timestamp(dateObj):
'''
takes different time formats and returns a utc timestamp in seconds as integer
'''
# convert datetime to utc time
if isinstance(dateObj, datetime):
if not dateObj.tzinfo:
dateObj = dateObj.replace(tzinfo=utc)
return int(calendar.timegm(dateObj.astimezone(utc).timetuple()))
# convert date to midnight local time
if isinstance(dateObj, date):
dateObj = datetime.combine(dateObj, datetime.min.time()).replace(tzinfo=local)
return int(calendar.timegm(dateObj.astimezone(utc).timetuple()))
# floating point and int is assumed to be UTC
if isinstance(dateObj, float):
return int(float(dateObj))
if isinstance(dateObj, int):
return int(dateObj)
return 0
def datetime_to_date(dateObj):
result = dateObj.astimezone(local)
return result.date()
def date_to_datetime(dateObj):
result = date_to_timestamp(dateObj)
return datetime.utcfromtimestamp(result).replace(tzinfo=utc)
def date_to_timestamp_str(dateObj):
return "%d" % date_to_timestamp(dateObj)
def localNow():
return datetime.now(local)
def utcNow():
return datetime.utcnow().replace(tzinfo=utc)
def get_time():
return _time.gmtime()
def local_date_to_utc(date, time):
return datetime.combine(date, time).replace(tzinfo=local).astimezone(utc)
# date is always based on local machine timezone
def today():
return date.today()
def str_to_date(string):
'''
Expects a YYYY-MM-DD string, returns a date object.
'''
return date(*[int(x) for x in string.split('-')])
def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, date):
serial = obj.isoformat()
return serial
if isinstance(obj, datetime):
serial = obj.isoformat()
return serial
raise TypeError("Type not serializable")

@ -0,0 +1,195 @@
import json
import uuid
import splunk_instrumentation.constants as constants
from splunk_instrumentation.service_bundle import ServiceBundle
import splunk_instrumentation.splunklib as splunklib
from splunk_instrumentation.splunklib.data import Record
class DeploymentIdManager(object):
'''
Manages the initialization and retrieval of the deployment ID.
Historical Note: The deployment ID was originally stored in the KV
store. This approach was abandoned due to stability and availability
concerns of the KV store. This class handles detecting existing
deployment ID settings in the KV store and migrates it to telemetry.conf.
The conf file approach should be more reliable than the KV store (since
conf files are available on all splunk product types, cannot be disabled
by the user, and do not rely on an external mongo process).
'''
# Randomly generated namespace to use when generating uuids
instrumentationUuidNamespace = uuid.UUID('6622c08d-93f1-4af0-bb9a-c58580975285')
# The deployment ID for this splunk deployment
deployment_id = None
def __init__(self,
splunkd,
services=None,
telemetry_conf_service=None,
server_info_service=None):
self.services = services or ServiceBundle(
splunkd,
telemetry_conf_service=telemetry_conf_service,
server_info_service=server_info_service)
self.splunkd = self.services.splunkd
self.telemetry_conf_service = self.services.telemetry_conf_service
self.server_info_service = self.services.server_info_service
self._kv_store_deployment_id = None
self._prefix = None
def get_deployment_id(self, no_create=False):
"""
Gets the deployment ID for this splunk instance.
If no deployment ID is known yet, the behavior depends
on the TelemetryConfService used to instantiate this object.
- If the TelemetryConfService is read only, `None` is returned.
- If the TelemetryConfService is writable, any deployment ID in
the KV store is migrated to the conf file, or a new one is
generated and written to the conf file.
"""
if self.deployment_id is not None:
return self.deployment_id
self.deployment_id = self.telemetry_conf_service.content.get('deploymentID')
deployment_id_is_in_conf_file = self.deployment_id is not None
can_write_conf_file = not self.telemetry_conf_service.is_read_only
# Check the KV store for an existing ID
if not self.deployment_id and self.kv_store_is_available():
if self.has_deployment_id_in_kv_store():
self.deployment_id = self.kv_store_deployment_id
# We can't write to the conf file, so we don't want to generate a new
# deployment ID either. (Since we can't persist it, it would be a one-time-
# only deployment ID.) Instead, we'll just return what we have, which will
# be either `None`, or the deployment ID from the KV store.
if not can_write_conf_file or no_create:
# May still be `None`!
return self.deployment_id
# Create ID on demand if none yet exists
if self.deployment_id is None:
self.generate_new_deployment_id()
if not deployment_id_is_in_conf_file and can_write_conf_file:
self.write_deployment_id_to_conf_file()
return self.deployment_id
def sync_deployment_id(self):
'''
Get deployment id from cluster master
'''
# If we can't persist the value to the conf file,
# let's not bother trying to fetch it from the cluster master[s],
# or else we'll end up going back to the cluster master every time
# we need to look it up. Considering that this lookup may happen
# more than once per page load, the performance impact could be
# significant.
if self.telemetry_conf_service.is_read_only:
return
try:
resp = self.splunkd.request(
constants.ENDPOINTS['MASTER_SETTINGS'],
method='GET',
owner='nobody',
app=constants.INST_APP_NAME)
data = splunklib.data.load(resp.get('body').read())
entry = data['feed'].get('entry')
if entry:
if type(entry) is list:
deploymentList = [value['content'].get('deploymentID') for value in entry]
if deploymentList:
deploymentList.sort()
self.deployment_id = deploymentList[0]
elif type(entry) is Record:
self.deployment_id = entry['content'].get('deploymentID')
if self.deployment_id:
self.write_deployment_id_to_conf_file()
except Exception:
# Cluster master sync is best-effort only
pass
def generate_new_deployment_id(self):
'''
Generates a new deployment id and saves it to self.deployment_id
'''
prefix = self.get_deployment_id_prefix()
self.deployment_id = str(uuid.uuid5(self.instrumentationUuidNamespace,
self.server_info_service.content.get('master_guid')))
if prefix:
self.deployment_id = prefix + '-' + self.deployment_id
def get_deployment_id_prefix(self):
'''
Determines the correct deployment ID prefix for this deployment.
'''
if self._prefix:
return self._prefix
prefix = None
if self.server_info_service.is_cloud():
prefix = 'CLOUD'
if self.server_info_service.is_lite():
prefix += 'LIGHT'
self._prefix = prefix
return self._prefix
def kv_store_is_available(self):
'''
Returns true if the kv store status is reported as "ready"
'''
return self.server_info_service.content.get('kvStoreStatus') == 'ready'
def has_deployment_id_in_kv_store(self):
'''
Returns True if the KV store has an entry for the deployment ID.
'''
return self.kv_store_deployment_id is not None
def set_deployment_id(self, deployment_id):
'''
Explicitly sets the deployment ID.
'''
self.deployment_id = deployment_id
self.write_deployment_id_to_conf_file()
def write_deployment_id_to_conf_file(self):
'''
Writes self.deployment_id to the telemetry conf file.
'''
self.telemetry_conf_service.update({
'deploymentID': self.deployment_id
})
self.telemetry_conf_service.fetch()
self.deployment_id = self.telemetry_conf_service.content.get('deploymentID')
@property
def kv_store_deployment_id(self):
'''
Returns the KV store
'''
try:
if not self._kv_store_deployment_id:
resp = self.splunkd.get(
constants.ENDPOINTS['KV_STORE']['DEPLOYMENT_ID'],
owner=constants.INST_KV_OWNER,
app=constants.INST_APP_NAME)
field_descriptor = json.loads(resp['body'].read())
self._kv_store_deployment_id = field_descriptor['value']
return self._kv_store_deployment_id
except Exception:
return None

@ -0,0 +1,260 @@
import re
import splunk_instrumentation.constants as constants
def host_from_uri(uri):
match = re.findall(r'https?://([^:/]+)', uri)
if match:
return match[0]
return None
def make_node(host, uri, roles, auth_method):
return {
'host': host,
'uri': uri,
'roles': roles,
'authMethod': auth_method
}
class NodeList(object):
"""
Collecting the list of nodes in a deployment
"""
def __init__(self, service, server_uri=constants.SPLUNKD_URI):
self.nodes = []
self.err_msgs = []
self.service = service
self.server_uri = server_uri
def get_server_roles(self):
return self.service.get_json('/services/server/info')['entry'][0]['content']['server_roles']
def get_server_conf_cluster_master_uris(self):
"""
This method is used to read cluster master stanzas from server.conf .
:return: the cluster master uris from server.conf cm stanzas if there is one or more, [] otherwise
"""
cm_uris = []
server_conf_dict = {}
server_conf_resp = self.service.get_json('/services/configs/conf-server')
# Add all stanzas in server conf to a dict for later use
for stanza in server_conf_resp['entry']:
server_conf_dict[stanza['name']] = stanza
if 'clustering' in server_conf_dict:
# Bug SPL-203589, telemetry info is incorrect when search head has 2 indexer cluster connected
# /services/configs/conf-server changed, we need parse the CM info from the new field
key_in_content = 'master_uri'
if not key_in_content in server_conf_dict['clustering']['content'] or not server_conf_dict['clustering']['content'][key_in_content]:
key_in_content = 'manager_uri'
cm_stanza_names = server_conf_dict['clustering']['content'][key_in_content].split(',')
if isinstance(cm_stanza_names, list) and len(cm_stanza_names) > 0:
for stanza_name in cm_stanza_names:
if stanza_name in server_conf_dict:
cm_uris.append(server_conf_dict[stanza_name]['content'][key_in_content])
return cm_uris
def is_search_head(self):
return 'search_head' in self.get_server_roles()
def is_shc_enabled(self):
try:
shc_config = self.service.get_json('/services/shcluster/config')
return not shc_config['entry'][0]['content']['disabled']
except Exception:
# May fail if user doesn't have list_search_head_clustering capabilty
return False
def append_self(self):
"""
Adds the local node to the node list result
"""
if self.is_search_head():
role = 'search head'
else:
role = 'single instance'
self.nodes.append(make_node(host=host_from_uri(self.server_uri),
uri=self.server_uri,
roles=[role],
auth_method=role))
def append_shc_members(self):
"""
Adds SHC members to the node list result
"""
try:
shc_member_entries = self.service.get_json('/services/shcluster/member/members', count=-1)['entry']
# print SHC_members_info
for member in shc_member_entries:
roles = ['SHC member']
if member['content']['is_captain']:
roles.append('SHC captain')
uri = member['content']['mgmt_uri']
self.nodes.append(make_node(host=host_from_uri(uri),
uri=uri,
roles=roles,
auth_method='search head'))
except Exception:
self.err_msgs.append('Could not locate any search head cluster members in this deployment')
def append_cluster_master(self):
"""
Adds cluster master to the node list result.
"""
try:
cluster_master_info = self.service.get_json('/services/cluster/config')['entry'][0]['content']
if cluster_master_info['disabled']:
return None
def append_nodes(host_uri):
self.nodes.append(make_node(host=host_from_uri(host_uri),
uri=host_uri,
roles=['cluster master'],
auth_method='cluster master'))
# If there is more than 1 CM, this output is a '?', so check if valid uri
if re.match(r'https?://[^:/]+', cluster_master_info['master_uri']):
uri = cluster_master_info['master_uri']
append_nodes(uri)
else:
# Fetch cm uri's from server.conf
cluster_master_uris = self.get_server_conf_cluster_master_uris()
if cluster_master_uris:
for uri in cluster_master_uris:
append_nodes(uri)
except Exception:
self.err_msgs.append('Failed to locate a Cluster Master in this deployment')
def append_search_peers(self):
"""
Adds search peers to the node list result
"""
try:
search_peer_entries = self.service.get_json('/services/search/distributed/peers', count=-1)['entry']
for peer in search_peer_entries:
uri = "{scheme}://{host_port}".format(
scheme=('https' if peer['content']['is_https'] else 'http'),
host_port=peer['name']
)
roles = []
for role in peer['content']['server_roles']:
clean_role_name = role.replace('_', ' ')
roles.append(clean_role_name)
if not roles:
roles = ['indexer']
host = peer['content']['host']
if not host:
host = host_from_uri(uri) or uri
self.nodes.append(make_node(host=host,
uri=uri,
roles=roles,
auth_method='indexer'))
except Exception:
self.err_msgs.append('Failed to locate any search peers/indexers in this deployment')
# # MVP does not support license master
# def append_license_master(self):
# try:
# license_master_info = self.service.get('/services/licenser/localslave', output_mode='json')
# uri = json.loads(license_master_info.get('body').read())['entry'][0]['content']['master_uri']
# license_master = {'host': re.findall(r'https?://([^:/]+)', uri)[0],
# 'uri': uri,
# 'roles': ['license master'],
# auth_token: 'license master'}
# self.nodes.append(license_master)
# except Exception as ex:
# self.err_msgs.append('Failed to locate a License Master in this deployment')
def fetch_nodes(self):
"""
Gets the License master, SHC Captain, SHC members, Cluster master and
indexers/search peers
:return: dictionary containing the list of nodes in a deployment
"""
self.nodes = []
self.err_msgs = []
# Get the search heads
if self.is_shc_enabled():
self.append_shc_members()
elif self.is_search_head():
self.append_self()
# Get the Cluster Master
self.append_cluster_master()
# Get the search peers or indexers
self.append_search_peers()
if len(self.nodes) == 0:
self.append_self()
return {'nodes': self.nodes, 'errors': self.err_msgs}
if __name__ == '__main__':
'''
This file can be run as a standalone CLI script for debugging.
There are a few reasons you might do this:
- To quickly iterate on the endpoint when the target splunk is
on a remote host.
- To easily use printf/pdb/ipdb for debugging local or remote
splunks, without having to jump through hoops to setup a remote
debugger connection to the endpoint process that splunkd
spawns, or redirect error logs, etc.
Usage:
# CD to ensure python load path is setup correctly
cd path/to/app_splunk_instrumentation/splunk_instrumentation/bin
# Use splunk's python so the splunk libs are on the load path too
splunk cmd python -m splunk_instrumentation.deployment_node_list
# Alternatively, with a remote splunk
splunk cmd python -m splunk_instrumentation.deployment_node_list https://remote_splunk:8089
Example:
$ splunk cmd python -m splunk_instrumentation.deployment_node_list
{'errors': ['Could not locate any search head cluster members in this deployment',
'Failed to locate a Cluster Master in this deployment'],
'nodes': [{'authMethod': 'indexer',
'host': u'9ac296fad4e8',
'roles': [u'indexer', u'license_master', u'search_peer'],
'uri': 'https://localhost:8090'}]}
'''
import sys
from splunk_instrumentation.splunkd import Splunkd
from splunk_instrumentation.cli_token import get_token
from pprint import pprint
if len(sys.argv) > 1:
splunk_uri = sys.argv[1]
else:
splunk_uri = 'https://localhost:8089'
service = Splunkd(server_uri=splunk_uri, token=get_token(splunk_uri))
node_list = NodeList(service)
pprint(node_list.fetch_nodes())

@ -0,0 +1,9 @@
import uuid
def make_uuid():
"""
Makes a UUID that's consistent with the splunkd diag endpoints,
which enforce capitalization as well as UUID format.
"""
return str(uuid.uuid4()).upper()

@ -0,0 +1,143 @@
import time
import sys
if sys.version_info >= (3, 0):
from queue import Queue
else:
from Queue import Queue
from threading import Thread
from splunk_instrumentation.diag import make_uuid
class Task(object):
"""
is the base class for all task that need running
"""
def run(self):
"""
to be overridden by new class
:return: {}
"""
return {}
def get_id(self):
"""
all running task require an ID
:return: str
"""
if not hasattr(self, 'task_id'):
self.task_id = make_uuid()
return self.task_id
def to_dict(self):
"""
used for logging meta data
:return:
"""
return {}
class BatchRunner(object):
"""
BatchRunner will run Task in parallel threads
example
class MyTask(Task):
def run():
print "running"
batch = BatchRunner()
batch.add_task(new MyTask())
batch.add_task(new MyTask())
batch.run()
"""
def __init__(self, config={}, limit=3):
"""
:param config: object to be passed to the task object
:param limit: max concurrent threads
"""
self.limit = limit
self.task_queue = Queue()
self.config = config
self.batch_id = make_uuid()
def get_batch_id(self):
"""
the unique ID for the patch process
:return: str
"""
return self.batch_id
def to_dict(self):
"""
used for logging meta data
:return:
"""
return self.config
def work_thread(self):
"""
grabs the next task to run and runs the task
then marks it as done.
Will loop until queue is empty
This method is ran in it's own thread
"""
while not self.task_queue.empty():
task = self.task_queue.get()
try:
task.run()
except Exception:
# Don't kill the worker thread
# just because a single task barfed
pass
self.task_queue.task_done()
def add_task(self, task):
"""
adds a Task instance to the queue
:param task:
:return:
"""
self.task_queue.put(task)
def run(self):
"""
this is the main entry point to start a batch
logs all items in the queue as "queued"
calls spawn thread to start batch
returns after all tasks are ran
"""
self.spawn_threads()
self.task_queue.join()
def spawn_threads(self):
"""
creates threads. Will create threads up to self.limit
each thread will start by running work_thread
"""
for x in range(self.limit):
worker = Thread(target=self.work_thread, args=())
worker.daemon = True
time.sleep(1) # this is arbitrary wait just so that there is not three request at the same time
worker.start()

@ -0,0 +1,35 @@
#!/usr/bin/python
import os
import sys
import json
"""Spawn multiple workers and wait for them to complete"""
path = os.path.realpath(os.path.join(os.path.dirname(os.path.realpath(__file__)),
'..', '..', '..', 'bin'))
sys.path.append(path)
from splunk_instrumentation.diag.batch_runner import BatchRunner # noqa
from splunk_instrumentation.diag.diag_task import DiagTask # noqa
from splunk_instrumentation.splunkd import Splunkd # noqa
from splunk_instrumentation.diag.diag_service import DiagService # noqa
configurationStr = sys.stdin.read()
configuration = json.loads(configurationStr)
splunkd = Splunkd(token=configuration['token'], server_uri=configuration['server_uri'])
diag_service = DiagService(splunkd)
batchRunner = BatchRunner(config=configuration['payload'])
diag_service.batch_id = batchRunner.batch_id
for config in configuration['payload']['nodes']:
batchRunner.add_task(DiagTask({"node": config, "configuration": configuration['payload']['configuration']},
diag_service=diag_service))
print(json.dumps({'batch_id': batchRunner.batch_id}) + '\r\n')
sys.stdout.flush()
batchRunner.run()

@ -0,0 +1,89 @@
import json
import time
from splunk_instrumentation.constants import ENDPOINTS, INST_APP_NAME, DIAG_STATUS_MSG
class DiagService(object):
"""
DiagService is a helper class that manages node diags and tracks them in events
"""
def __init__(self, splunkd=None, diag_filepath="", batch_id=None):
self.splunkd = splunkd
self.diag_filepath = diag_filepath
self.batch_id = batch_id
def get_batch_id(self):
"""
return the unique id for the batch of diags being ran
there is one idea for all the diags ran in a batch.
:return:
batch id
"""
return self.batch_id
def create_node_diag_task(self, config):
"""
takes a config and calls the diag endpoint to create a diag
:param config:
{ node : {},
configuration : {}
}
:return:
{ diagFilename : "",
size : n
}
"""
configuration = config['configuration'].copy()
configuration.pop('mode', None) # Remove debug only field
body = json.dumps({
'batchID': self.get_batch_id(),
'node': {
'uri': config['node']['uri'],
'authMethod': config['node']['authMethod']
},
'configuration': configuration
})
try:
create_resp = self.splunkd.service.request(ENDPOINTS['DIAG_ENDPOINT'],
method="POST",
body=body,
headers=[('content-type', 'application/json')],
owner='nobody',
app=INST_APP_NAME)
except Exception as ex:
raise Exception("Diag Remote Failed: {}".format(ex))
if create_resp.status < 200 or create_resp.status >= 300:
raise Exception("Diag Remote invalid status {}".format(create_resp.status))
data = create_resp.get('body').read()
data = json.loads(data)
sync_error_count = 0
while True:
try:
statuses = self.splunkd.get_json(ENDPOINTS['DIAG_STATUS'],
owner='nobody',
app=INST_APP_NAME)
for item in statuses:
json_item = json.loads(item)
if (json_item['daigID'] == data['diagID']):
status = item
break
except Exception as ex:
sync_error_count += 1
if sync_error_count > 5:
return {'status': 'failed', 'description': str(ex)}
if status.get('status') != DIAG_STATUS_MSG['PROGRESS']:
return status
time.sleep(2)

@ -0,0 +1,35 @@
from splunk_instrumentation.diag.batch_runner import Task
class DiagTask(Task):
"""
Is a batch_runner task for diags.
"""
def __init__(self, config, diag_service):
"""
:param config: the diags config from request { node:..., configuration:...}
:param diag_service: DiagService instance
"""
self.config = config
self.diag_service = diag_service
def run(self):
"""
calls the diag service to create a diag from a remote machine
:return:
{ diagFilename : "",
size : n
}
"""
return self.diag_service.create_node_diag_task(self.config)
def to_dict(self):
"""
returns the config
:return:
config
"""
return self.config.get('node')

@ -0,0 +1,82 @@
"""
BaseClass.
This base class manages connection to splunkd.
"""
import time
import splunk_instrumentation.splunklib as splunklib
from splunk_instrumentation.constants import SPLUNKRC, INST_APP_NAME
from splunk_instrumentation.splunkd import Splunkd
class BaseClass(object):
"""BaseClass."""
def __init__(self, splunkrc=None, index_name=None, inst_key=None, inst_host=None, inst_port=None):
"""
Constructor.
Grab SPLUNKRC from parameter or from constants.py
"""
self.splunkrc = splunkrc or SPLUNKRC
self._splunkd_search = self.__connect_to_splunkd(search=True)
self._splunkd = self.__connect_to_splunkd()
# Public API
# ----------
def submit(self, event, host=None, source=None, sourcetype=None):
"""Submit a new event directly to the index."""
self._index.submit(
event, host=host, source=source, sourcetype=sourcetype)
def search(self, search_cmd, **kwargs):
"""Submit a new search."""
return self._query(search_cmd, **kwargs)
# "Private" methods
# -----------------
def _set_index(self, name):
if name:
self.__ensure_index_exists(name)
self._index = self._splunkd.indexes[name]
self.index_name = name
def __ensure_index_exists(self, name):
if not self._splunkd.has_index(name):
self._splunkd.indexes.create(name)
def _query(self, search_cmd, **kwargs):
"""Query.
Note that earliest is inclusive & latest is exclusive:
[earliest, latest)
(Prevents getting last-second events again during the next query)
"""
job = self._splunkd_search.search(search_cmd, **kwargs)
while not job.is_done():
time.sleep(0.2)
return splunklib.results.ResultsReader(job.results(count=0))
def __connect_to_splunkd(self, search=False):
if search:
# WITH NAMESPACE
splunkrc_copy = {key: self.splunkrc[key] for key in self.splunkrc}
if 'owner' not in splunkrc_copy:
splunkrc_copy['owner'] = '-'
if 'app' not in splunkrc_copy:
splunkrc_copy['app'] = INST_APP_NAME
return Splunkd(**splunkrc_copy)
else:
splunkrc_copy = {key: self.splunkrc[key] for key in self.splunkrc}
if 'owner' in splunkrc_copy:
splunkrc_copy.pop('owner')
if 'app' in splunkrc_copy:
splunkrc_copy.pop('app')
return Splunkd(**self.splunkrc)

@ -0,0 +1,83 @@
import json
import logging
import sys
from splunk_instrumentation.splunkd import Splunkd
from splunk_instrumentation.constants import SPLUNKRC
from splunk_instrumentation.datetime_util import json_serial
from splunk_instrumentation.constants import INSTRUMENTATION_SOURCETYPE
class EventWriter(object):
""" Event Writer class
This class handles writing to the index.
It grabs a splunkd object according to the splunkrc params provided:
- If splunkrc is a dictionary, it will create a new splunkd object.
- If given other object type, it will do do Dependency Injection on _splunkd
"""
def __init__(self, splunkrc=None, index_name=None):
self.splunkrc = splunkrc or SPLUNKRC
self.socket = None
self._index = None
if type(self.splunkrc) is dict:
self._splunkd = Splunkd(**self.splunkrc)
else:
self._splunkd = splunkrc
if index_name:
if self._splunkd.has_index(index_name):
self._index = self._splunkd.get_index(index_name)
else:
logging.error('ERROR: INDEX IS NOT AVAILABLE')
raise Exception("ERROR INDEX UNAVAILABLE")
def submit(self, event, host=None, source=None, sourcetype=INSTRUMENTATION_SOURCETYPE):
# Note: We used to use the ordinary index.submit method from splunklib,
# instead of using a socket. However, that method uses receivers/simple,
# an endpoint that bypasses index time field extraction (which we rely on).
temp_socket = self._index.attach(host=host, source=source, sourcetype=sourcetype)
temp_socket.send(self.marshal_event(event))
temp_socket.close()
def open_socket(self, host=None, source=None, sourcetype=INSTRUMENTATION_SOURCETYPE):
'''
Opens a socket to stream events to be indexed, saving it as
an instance variable for later use when submit_via_socket is called.
:param host:
:param source:
:param sourcetype:
:return:
'''
self.socket = self._index.attach(host=host, source=source, sourcetype=sourcetype)
return self.socket
def close_socket(self):
'''
Closes socket and set it to none
'''
if self.socket:
self.socket.close()
self.socket = None
def submit_via_socket(self, event):
"""
Submit the event provided using socket connection.
"""
event = self.marshal_event(event)
if not self.socket:
self.open_socket()
self.socket.send(event)
@staticmethod
def marshal_event(event):
'''
Marshals the given event into a json string, suitable for passing
to an open receivers/stream socket.
'''
if not isinstance(event, (str, bytes)):
event = json.dumps(event, default=json_serial)
if isinstance(event, str) and sys.version_info >= (3, 0):
event = event.encode()
return event

@ -0,0 +1,91 @@
import json
from splunk_instrumentation.constants import INSTRUMENTATION_INDEX_NAME, INSTRUMENTATION_SOURCETYPE, INST_LICENSE_TYPES
from splunk_instrumentation.datetime_util import local, date_to_timestamp_str
from splunk_instrumentation.indexing.event_writer import EventWriter
from splunk_instrumentation.indexing.query_runner import QueryRunner
from datetime import timedelta, datetime, time
RANGE_TYPE_TIMESTAMP = 1
RANGE_TYPE_DATE = 2
class InstrumentationIndex(object):
def __init__(self, splunkrc=None, index_name=INSTRUMENTATION_INDEX_NAME,
query_runner=None, event_writer=None):
self.index_name = index_name
if query_runner:
self.query_runner = query_runner
else:
self.query_runner = QueryRunner(splunkrc, self.index_name)
if event_writer:
self.event_writer = event_writer
else:
self.event_writer = EventWriter(splunkrc, self.index_name)
# Public API
# ----------
def process_new_events(self, start, end, batchNum, callback, visibility=[], time_range=None):
'''
Calls `callback` with an iterable of new events.
If callback does not throw an exception, the events will no
longer be "new."
'''
events = self._query_by_date(start, end, batchNum, visibility, time_range)
results = []
for event in events:
results.append(json.loads(event.get('_raw')))
callback(results)
def close_connection(self):
'''
calling close socket
'''
self.event_writer.close_socket()
def pipe_json(self, event):
self.event_writer.submit_via_socket(event)
# "Private" methods
# -----------------
def _query_by_date(self, t_start, t_end, batchNum, visibility, time_limit=None):
'''
earliest and latest makes the assumtion that _telemery events are indexed the day after they happen
:param t_start:
:param t_end:
:param visibility:
:return:
'''
search_cmd = 'search index=' + self.index_name
search_cmd += " sourcetype=" + INSTRUMENTATION_SOURCETYPE + " | spath date | search "
if time_limit:
kwargs = {
"earliest_time": date_to_timestamp_str(time_limit['start']),
"latest_time": date_to_timestamp_str(time_limit['stop'])
}
else:
kwargs = {
"earliest_time": date_to_timestamp_str(datetime.combine(t_start, time.min).replace(tzinfo=local)),
"latest_time": date_to_timestamp_str(datetime.combine(t_end + timedelta(days=1),
time.max).replace(tzinfo=local))
}
if t_start:
search_cmd += (' date>=%s' % t_start.strftime("%Y-%m-%d"))
if t_end:
search_cmd += (' date<=%s' % t_end.strftime("%Y-%m-%d"))
if batchNum:
search_cmd += (' batchNum=%s ' % str(batchNum))
visibility_cmd = self._get_visibility_cmd(visibility)
search_cmd += " (%s)" % visibility_cmd
return self.query_runner.search(search_cmd, **kwargs)
def _get_visibility_cmd(self, visibility):
if not visibility:
visibility = INST_LICENSE_TYPES
return " OR ".join(["visibility= *" + str(x) + "*" for x in visibility])

@ -0,0 +1,67 @@
import json
import xml.dom.minidom as dom
if sys.version_info >= (3, 0):
from urllib.parse import urlencode
else:
from urllib import urlencode
from splunk_instrumentation.report import report
from splunk_instrumentation.constants import SPLUNKRC, INST_KV_OWNER, INST_KV_APP, KV_STORE_ENDPOINT, COLLECTION_NAME
from splunk_instrumentation.indexing.base_class import BaseClass
class KvStore(BaseClass):
def __init__(self, splunkrc=None):
splunkrc = (splunkrc or SPLUNKRC).copy()
splunkrc['owner'] = splunkrc.get('owner') or INST_KV_OWNER
splunkrc['app'] = splunkrc.get('app') or INST_KV_APP
super(KvStore, self).__init__(splunkrc)
self.headers = [('content-type', 'application/json')]
self.service = self._splunkd.service
self.prepare_collection()
def to_json(self, data):
return json.dumps(data)
def prepare_collection(self):
payload = self.service.request(KV_STORE_ENDPOINT["config"], method="GET", headers=self.headers,
owner=self.splunkrc['owner'], app=self.splunkrc['app'])
collections = self.parse_collection(payload)
if COLLECTION_NAME not in collections:
report.report('noKVStore', True)
def parse_collection(self, payload):
data = dom.parseString(payload['body'].read())
return [node.firstChild.data for node in data.getElementsByTagName('title')]
def create_collection(self, name):
data = urlencode({"name": name})
try:
self.service.request(KV_STORE_ENDPOINT["config"], method="POST", body=data, headers=self.headers,
owner=self.splunkrc['owner'], app=self.splunkrc['app'])
except Exception:
return False
return True
def set_key(self, key, value):
# _key is unique to each of the entries.
value = json.dumps({"value": value, "_key": key})
try:
self.service.request(KV_STORE_ENDPOINT["document"], method="POST", body=value, headers=self.headers,
owner=self.splunkrc['owner'], app=self.splunkrc['app'])
except Exception:
self.service.request(KV_STORE_ENDPOINT["document"]+"/"+key, method="POST", body=value, headers=self.headers,
owner=self.splunkrc['owner'], app=self.splunkrc['app'])
def get_key(self, key, default=None):
try:
payload = self.service.request(KV_STORE_ENDPOINT["document"]+"/"+key, method="GET", headers=self.headers,
owner=self.splunkrc['owner'], app=self.splunkrc['app'])
value = json.loads(str(payload['body']))
return value.get('value')
except Exception:
return default

@ -0,0 +1,64 @@
from splunk_instrumentation.splunkd import Splunkd
from splunk_instrumentation.constants import SPLUNKRC, INST_APP_NAME, INSTRUMENTATION_INDEX_NAME
import splunk_instrumentation.splunklib.results as splunklib
import time
import logging
class QueryRunnerResult(splunklib.ResultsReader):
def __init__(self, stream, job=None):
super(QueryRunnerResult, self).__init__(stream)
self.job = job
class QueryRunner(object):
""" Query Runner.
a class to handle query to splunkd.
It grabs a splunkd object according to the splunkrc params provided:
- If splunkrc is a dictionary, it will create a new splunkd object.
- If given other object type, it will do do Dependency Injection on _splunkd
"""
def __init__(self,
splunkrc,
index_name=INSTRUMENTATION_INDEX_NAME,
owner='-',
app=INST_APP_NAME, result_reader=QueryRunnerResult):
self.splunkrc = splunkrc or SPLUNKRC
self.result_reader = result_reader
if type(self.splunkrc) is dict:
self._splunkd = Splunkd(**self.splunkrc)
else:
self._splunkd = splunkrc
self._splunkd.namespace['owner'] = owner
self._splunkd.namespace['app'] = app
if self._splunkd.has_index(index_name):
self._index = self._splunkd.get_index(index_name)
else:
logging.error('ERROR: INDEX IS NOT AVAILABLE')
raise(Exception("ERROR INDEX UNAVAILABLE"))
def search(self, search_cmd, **kwargs):
"""Submit a new search.
It is a wrapper to the private method _query.
"""
return self._query(search_cmd, **kwargs)
def _query(self, search_cmd, **kwargs):
"""Query.
Note that earliest is inclusive & latest is exclusive:
[earliest, latest)
(Prevents getting last-second events again during the next query)
"""
job = self._splunkd.search(search_cmd, **kwargs)
while not job.is_done():
time.sleep(0.2)
result = self.result_reader(job.results(count=0))
if hasattr(result, 'job'):
result.job = job
return result

@ -0,0 +1,142 @@
'''
This is the main entry point to scripted inputs to run
checks if this instance should run the app and then runs the app
'''
from __future__ import absolute_import
from splunk_instrumentation.constants import INST_EXECUTION_ID, INST_SCHEMA_FILE, INST_DEBUG_LEVEL
import sys
import logging
from splunk_instrumentation.report import report
from time import sleep
from splunk_instrumentation.schedule_manager import ScheduleManager
from splunk_instrumentation.dataPoints.data_point import dataPointFactory
from splunk_instrumentation.metrics.metrics_schema import load_schema
from splunk_instrumentation.metrics.instance_profile import get_instance_profile, is_lead_node
from splunk_instrumentation.constants import INTROSPECTION_INDEX_NAME
logging.root.setLevel(INST_DEBUG_LEVEL)
formatter = logging.Formatter('%(levelname)s %(message)s')
handler = logging.StreamHandler(stream=sys.stderr)
handler.setFormatter(formatter)
logging.root.addHandler(handler)
report.report('executionID', INST_EXECUTION_ID)
def pre_run(profile):
'''
Do some work to keep the environment healthy
- sync deployment id from CM to current node
- sync salt from CM to current node
- retry transaction if retryTransaction in telemtry.conf is not empty
:param profile
:return: None
'''
profile.sync_deployment_id()
profile.sync_salt()
# if current node is a single search head or a seach head captain in SHC env
# call profile.retry_transaction() to retry sync telemetry conf values to Cluster Master
# TelemetryHandler.cpp SHOULD sync telemetry.conf to CM already whenever any value is changed.
# This is to handle the case when it failed
if (profile.roles.get('search_head') and
not profile.roles.get('shc_member')) \
or profile.roles.get('sh_captain'):
report.report("profile.retry_transaction", True)
profile.retry_transaction()
def run_phase_1_for_all_nodes(dateRange, schema_file):
'''
phase 1 runs by all nodes to collect role based data and index to data to _introspection
phase 1 does not check opt in options
:param profile
:param dateRange
:param schema_file
:return: None
'''
report.report('Running_Phase[]', 1)
ms = load_schema(schema_file, '*')
sm = ScheduleManager(ms, dataPointFactory)
# to add phase 1 and ignore visibility
sm.phase_1(dateRange, INTROSPECTION_INDEX_NAME)
def can_run_phase2(profile):
'''
determine if current node can run phase 2
the requirement is that the current node needs to be the lead node and
that the deployment is opted-in (profile.visibility is not empty)
:param profile
:return: Boolean
'''
if is_lead_node(profile.roles) is False:
report.report("lead node", False)
return False
report.report("lead node", True)
report.report("profile.visibility", profile.visibility)
if not profile.visibility:
report.report("not-opted-in", True)
return False
if not profile.opt_in_is_up_to_date():
report.report("opt-in-out-of-date-license-only", True)
report.report("profile.cluster_mode", profile.profile.get('cluster_mode'))
report.report("profile.roles", profile.roles)
if profile.server_info.get('product_type') == "splunk":
report.report("instance.type", 'Cloud')
return False
return True
def run_phase_2(profile, dateRange, schema_file):
'''
phase 2 runs by lead node only and only runs when a deployment is opted in.
sm.phase_2() does the following:
- collects and indexes data points marked as phase = 2
- query data collected by phase = 1 and phase = 2 and send the data to splunkx
:param profile
:param dateRange
:param schema_file
:return: None
'''
report.report('Running_Phase[]', 2)
ms = load_schema(schema_file, profile.visibility)
sm = ScheduleManager(ms, dataPointFactory)
sleep(5)
sm.phase_2(dateRange, INTROSPECTION_INDEX_NAME)
def run_input(dateRange):
profile = get_instance_profile()
pre_run(profile)
logging.info("INST Started")
try:
run_phase_1_for_all_nodes(dateRange, INST_SCHEMA_FILE)
except Exception as ex:
report.report('input.error', str(ex))
if can_run_phase2(profile):
try:
run_phase_2(profile, dateRange, INST_SCHEMA_FILE)
report.send()
except Exception as ex:
report.report('input.error', str(ex))
logging.info("INST Done")

@ -0,0 +1,373 @@
import logging
import traceback
import os
import sys
import json
import datetime
from zipfile import ZipFile, ZIP_DEFLATED
from string import Template
import base64
from splunk.persistconn.application import PersistentServerConnectionApplication
import splunk.rest
import splunk.auth
import splunk.entity as en
if sys.version_info >= (3, 0):
from io import BytesIO as ZipIO
else:
from cStringIO import StringIO as ZipIO
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)s [%(name)s:%(lineno)d] %(message)s',
filename=os.path.join(os.environ.get('SPLUNK_HOME'), 'var', 'log', 'splunk',
'splunk_instrumentation.log'),
filemode='a')
# logger = logging.getLogger(__name__)
# Unfortunately, __name__ is something like
# pschand__instrumentation_controller__in_C__Program_Files_Splunk_etc_apps_splunk_instrumentation_bin_splunk_instrumentation
# when this script is run by PersistentServerConnectionApplication, so it's hard-coded here.
logger = logging.getLogger('instrumentation_controller')
logger.setLevel(logging.INFO)
if sys.platform == "win32":
import msvcrt
# Binary mode is required for persistent mode on Windows.
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
path = os.path.realpath(os.path.dirname(os.path.realpath(__file__)) + '/../../bin')
sys.path.append(path)
try:
import splunk_instrumentation.client_eligibility as client_eligibility
from splunk_instrumentation.service_bundle import ServiceBundle
from splunk_instrumentation.splunkd import Splunkd
import splunk_instrumentation.metrics.instance_profile as si_instance_profile
import splunk_instrumentation.packager as si_packager
except Exception:
raise
class InstrumentationRestHandler(PersistentServerConnectionApplication):
def __init__(self,
command_line=None,
command_arg=None,
entity=None,
services=None,
system_services=None,
packager=None,
instance_profile=None):
PersistentServerConnectionApplication.__init__(self)
self.deploymentID = ''
self.session = None
self.server_uri = ''
self.command_line = command_line
self.command_arg = command_arg
self.en = entity or en
self.services = services
self.system_services = system_services
self.packager = packager or si_packager
self.instance_profile = instance_profile or si_instance_profile
def splunkrc(self):
return {
'token': self.session['authtoken'],
'server_uri': splunk.rest.makeSplunkdUri()
}
def system_splunkrc(self):
return {
'token': self.system_authtoken,
'server_uri': splunk.rest.makeSplunkdUri()
}
def parse_arg(self, arg):
try:
arg = json.loads(arg)
except Exception:
raise Exception(["Payload must be a json parsable string"])
return arg
def get_query(self, arg, key):
for value in (arg['query'] or []):
if key == value[0]:
return value[1]
def get_earliest_and_latest(self, **kwargs):
self.assert_earliest_and_latest_provided(**kwargs)
return self.timestamp_to_internal_repr(kwargs.get('earliest'), kwargs.get('latest'))
def assert_earliest_and_latest_provided(self, **kwargs):
if not kwargs.get('earliest') or not kwargs.get('latest'):
raise Exception("earliest and latest query params are required")
def timestamp_to_internal_repr(self, *args):
result = []
for arg in args:
# the arguments passed in are sting with format of <year>-<month>-<day> ex 2016-3-4
# the conversion is done by hand instead of strptime because of the lack of padding
# on date and month
date_array = arg.split("-")
result.append(datetime.date(year=int(date_array[0]), month=int(date_array[1]), day=int(date_array[2])))
if len(result) == 1:
return result[0]
else:
return result
def check_telemetry_authorization(self, path):
# For a free license (where there are no users), there's nothing to check.
if self.services.server_info_service.content.get('isFree', '0') == '1':
return
if self.session is None:
raise splunk.RESTException(500, "No session found.")
logger.debug('username = %s' % self.session['user'])
userentity = self.en.getEntity('authentication/users', self.session['user'],
sessionKey=self.session['authtoken'])
logger.debug('userentity.properties["capabilities"] = %s' % userentity.properties['capabilities'])
if 'edit_telemetry_settings' not in userentity.properties['capabilities']:
logger.error('Access denied for path "%s". Returning 404. Insufficient user permissions' % path)
raise splunk.RESTException(404)
def get_instrumentation_eligibility(self, optInVersion=None, **kwargs):
'''
Determines whether the UI for the instrumentation app should be visible,
including the initial opt-in modal and all settings/logs pages.
This is determined by user capabilities, license type, and server roles.
'''
if self.session is None:
raise splunk.RESTException(500, "No session found.")
result = client_eligibility.get_eligibility(
self.system_services,
username=self.session['user'],
opt_in_version=optInVersion
)
return json.dumps(result)
def response_to_eligibility_request(self, arg):
return {
'payload': self.get_instrumentation_eligibility(**dict(arg['query'])),
'headers': {
'Content-Type': 'application/json'
},
'status': 200
}
def response_to_export_request(self, path, visibility, arg):
self.check_telemetry_authorization(path)
if arg['method'] != 'GET':
return {'payload': 'Only GET is allowed for /%s.' % path, 'status': 405}
usage_data = UsageData(True, visibility, self.splunkrc(), self.packager,
self.instance_profile, **dict(arg['query']))
# Need to do base64 encoding, since zip files are a binary format.
base64_payload = base64.b64encode(usage_data.payload())
if sys.version_info > (3, 0):
base64_payload = base64_payload.decode()
return {
'payload_base64': base64_payload,
'headers': {
'Content-Type': usage_data.content_type(),
'Content-Disposition': 'attachment; filename="%s"' % usage_data.filename()
},
'status': 200
}
def response_to_send_request(self, path, visibility, arg):
self.check_telemetry_authorization(path)
if arg['method'] != 'POST':
return {'payload': 'Only POST is allowed for /%s.' % path, 'status': 405}
usage_data = UsageData(False, visibility, self.splunkrc(), self.packager,
self.instance_profile, **dict(arg['query']))
usage_data.send()
return {
'payload': usage_data.payload(),
'status': 200
}
def handle(self, arg):
'''
Takes the parsed request data passed by splunkd to
PersistentServerConnectionApplication.handle and returns a response.
:param arg: JSON object
:return: JSON object
'''
arg = self.parse_arg(arg)
logger.debug('arg = %s' % json.dumps(arg))
if 'query' not in arg:
arg['query'] = []
try:
if 'session' not in arg:
raise splunk.RESTException(500, "No session found.")
self.session = arg['session']
if 'system_authtoken' not in arg:
raise splunk.RESTException(500, "No system auth token found.")
self.system_authtoken = arg['system_authtoken']
if self.services:
self.splunkd = self.services.splunkd
else:
self.splunkd = Splunkd(**self.splunkrc())
self.services = ServiceBundle(self.splunkd)
if not self.system_services:
splunkd = Splunkd(**self.system_splunkrc())
self.system_services = ServiceBundle(splunkd)
usage_data_endpoint_table = {
'anonymous_usage_data': {'visibility': 'anonymous', 'action': 'export'},
'license_usage_data': {'visibility': 'license', 'action': 'export'},
'support_usage_data': {'visibility': 'support', 'action': 'export'},
'send_anonymous_usage_data': {'visibility': 'anonymous', 'action': 'send'},
'send_license_usage_data': {'visibility': 'license', 'action': 'send'},
'send_support_usage_data': {'visibility': 'support', 'action': 'send'}
}
path = arg['path_info']
if path == 'instrumentation_eligibility':
return self.response_to_eligibility_request(arg)
elif path in usage_data_endpoint_table:
visibility = usage_data_endpoint_table[path]['visibility']
if (usage_data_endpoint_table[path]['action'] == 'export'):
return self.response_to_export_request(path, visibility, arg)
else:
return self.response_to_send_request(path, visibility, arg)
else:
return {
'payload': '"%s" not found' % path,
'status': 404,
'headers': {
'Content-Type': 'text/plain'
}
}
except splunk.RESTException as e:
logger.error(e)
return {'payload': 'Exception caught: %s' % e.msg, 'status': e.statusCode}
except Exception as e:
logger.error('ERROR: ' + traceback.format_exc())
return {'payload': traceback.format_exception_only(type(e), e)[-1], 'status': 500}
class UsageData(object):
_data_types_by_visibility = {
'anonymous': 'Diagnostic',
'license': 'License Usage',
'support': 'Support Usage'
}
def __init__(self, forExport, visibility, splunkrc, packager, instance_profile, **kwargs):
self.visibility = visibility
self.splunkrc = splunkrc
self.packager = packager
self.instance_profile = instance_profile
try:
self.earliest, self.latest = self.get_earliest_and_latest(**kwargs)
if self.isMoreThanOneYear():
logger.error("Date range must be less than 1 year.")
raise splunk.RESTException(403, "Date range must be less than 1 year.")
self.events = self.get_events_package(forExport)
if forExport:
data_type = UsageData._data_types_by_visibility[self.visibility]
self.zip_file_name, json_file_name = self.get_file_names(data_type)
value = self.get_file_content()
zipped_payload = self.zip_compress(json_file_name, value)
self._payload = zipped_payload
else:
self._payload = '{"sent_count": %d}' % len(self.events)
except Exception as e:
logger.exception(e)
raise
def send(self):
if self.events:
self.send_events_package()
def get_events_package(self, forExport=False):
_packager = self.packager.Packager(splunkrc=self.splunkrc)
return _packager.build_package(self.earliest, self.latest, [self.visibility], forExport)
def send_events_package(self):
_packager = self.packager.Packager(splunkrc=self.splunkrc)
_packager.manual_send_package(self.events, self.earliest, self.latest, [self.visibility])
def get_earliest_and_latest(self, **kwargs):
self.assert_earliest_and_latest_provided(**kwargs)
return self.timestamp_to_internal_repr(kwargs.get('earliest'), kwargs.get('latest'))
def assert_earliest_and_latest_provided(self, **kwargs):
if not kwargs.get('earliest') or not kwargs.get('latest'):
raise Exception("earliest and latest query params are required")
def timestamp_to_internal_repr(self, *args):
result = []
for arg in args:
# the arguments passed in are sting with format of <year>-<month>-<day> ex 2016-3-4
# the conversion is done by hand instead of strptime because of the lack of padding
# on date and month
date_array = arg.split("-")
result.append(datetime.date(year=int(date_array[0]), month=int(date_array[1]), day=int(date_array[2])))
if len(result) == 1:
return result[0]
else:
return result
def get_file_names(self, data_type, file_type=['zip', 'json']):
filename = Template('%s Data - %s to %s.$filename' % (
data_type,
('%d.%02d.%02d' % (self.earliest.year, self.earliest.month, self.earliest.day)),
('%d.%02d.%02d' % (self.latest.year, self.latest.month, self.latest.day))
))
return [filename.substitute(filename=ft) for ft in file_type]
def zip_compress(self, json_file_name, value):
temp = ZipIO()
with ZipFile(temp, 'w', ZIP_DEFLATED) as myzip:
myzip.writestr(json_file_name, value)
return temp.getvalue()
def get_file_content(self):
_packager = self.packager.Packager(splunkrc=self.splunkrc)
_instance_profile = self.instance_profile.get_instance_profile(splunkrc=self.splunkrc)
deployment_id = _instance_profile.get_deployment_id()
transaction_id = _packager.get_transactionID()
value = self.get_events_package(forExport=True)
ret_value = {
"deploymentID": deployment_id,
"transactionID": transaction_id,
"data": value}
return json.dumps(ret_value)
def isMoreThanOneYear(self):
copyEarliest = self.earliest.replace(year=self.earliest.year + 1)
if self.latest > copyEarliest:
return True
return False
def payload(self):
return self._payload
def content_type(self):
return 'application/zip'
def filename(self):
return self.zip_file_name

@ -0,0 +1,234 @@
"""InstanceProfile class."""
from builtins import object
from splunk_instrumentation.report import report
from splunk_instrumentation.splunklib import data as spldata
from splunk_instrumentation.constants import SPLUNKRC, VISIBILITY_FIELDS_BY_NAME
from splunk_instrumentation.indexing.query_runner import QueryRunner
from splunk_instrumentation.telemetry_conf_service import TelemetryConfService
from splunk_instrumentation.server_info_service import ServerInfoService
from splunk_instrumentation.deployment_id_manager import DeploymentIdManager
from splunk_instrumentation.service_bundle import ServiceBundle
from splunk_instrumentation.salt_manager import SaltManager
class InstanceProfile(object):
"""InstanceProfile.
This class will retrieve the instance's information.
self.server_info = server information will be stored here
self.visibility = visibility information will be stored here
"""
def __init__(self, splunkrc=SPLUNKRC, telemetryConfService=None, serverInfoService=None):
"""Constructor.
It grabs a query_runner object according to the splunkrc params provided:
- If splunkrc is a dictionary, it will instantiates a new QueryRuner object.
- If given other object type, it will do Dependency Injection on query_runner
"""
splunkrc = (splunkrc or SPLUNKRC)
if type(splunkrc) is dict:
self.query_runner = QueryRunner(splunkrc)
else:
self.query_runner = splunkrc
self.profile = {}
self.service = self.query_runner._splunkd.service
if not telemetryConfService:
self.telemetry_conf_service = TelemetryConfService(self.service)
else:
self.telemetry_conf_service = telemetryConfService
if not serverInfoService:
self.server_info_service = ServerInfoService(self.service)
else:
self.server_info_service = serverInfoService
self.telemetry_conf_service.fetch()
self.server_info_service.fetch()
self.service_bundle = ServiceBundle(self.service,
telemetry_conf_service=self.telemetry_conf_service,
server_info_service=self.server_info_service)
self.salt_manager = SaltManager(self.service_bundle)
self.deployment_id_manager = DeploymentIdManager(
self.service,
telemetry_conf_service=self.telemetry_conf_service,
server_info_service=self.server_info_service)
self.roles = {role: True for role in self.server_info['server_roles']}
# gets cluster info from endpoint
self._load_json({"end_point": "cluster/config/config", "name": "cluster_config"})
# Valid values: (master | slave | searchhead | disabled)
# Note that for a searchhead (with SHC or not) the value will be 'disabled' (rather than 'searchhead')
# if there is no indexer clustering.
# If call fails set cluster_mode to disabled.
self.profile['cluster_mode'] = self._nested_get(self.profile, 'cluster_config.entry.content.mode', 'disabled')
# gets search captain info from endpoint. noProxy is required so that it fails when instance is not the captain
self._load_json({"end_point": "shcluster/captain/info", "name": "captain_info"}, noProxy=True, default={})
# if captain/info returns a value it is captain : overwrites server roles
# this is failing so removing for the time being
# self.roles['shc_captain'] = bool(self.profile.get('captain_info'))
# if mode is not disabled then add in_cluster to roles : overwrites server roles
# Note: 'in_cluster' doesn't mean 'in a cluster', it means 'in a deployment that has indexer clustering'.
self.roles['in_cluster'] = not self.profile.get('cluster_mode') == 'disabled'
# overwrites server roles
self.roles['cluster_master'] = self.profile.get('cluster_mode') == 'master'
# determines if the current node has lead_role
self.roles['lead_node'] = self.eval_instance()
self._get_visibility()
def eval_instance(self):
req_list = [
{
"requirements": ['indexer', '!search_peer', '!cluster_slave',
'!shc_member', '!cluster_master',
'!shc_captain', '!cluster_search_head'],
"label": "Single",
"result": True
},
{
"requirements": ['cluster_master'],
"label": "Cluster Master",
"result": True
},
{
"requirements": ['!cluster_master', 'in_cluster'],
"label": "Cluster Member not Cluster Master",
"result": False
},
# assume we are already not a cluster member from the above requirements
{
"requirements": ['shc_captain'],
"label": "Search Captain in a non cluster",
"result": True
},
{
"requirements": ['!cluster_master', 'search_head', '!search_peer',
'!in_cluster', '!cluster_slave', '!shc_member'],
"label": "Single Search Head",
"result": True
},
]
for req in req_list:
result = evaluate_roles(self.roles, req["requirements"])
if result:
report.report("instance.type", req["label"])
return req["result"]
else:
report.report("instance.type", None)
def opt_in_is_up_to_date(self):
return self.telemetry_conf_service.opt_in_is_up_to_date()
@property
def server_info(self):
return self.server_info_service.content
@property
def server_is_cloud(self):
return int(self.telemetry_conf_service.content.get('onCloudInstance') or 0)
def retry_transaction(self):
self.telemetry_conf_service.retry_cluster_master_sync_transaction()
def sync_deployment_id(self):
self.deployment_id_manager.sync_deployment_id()
def sync_salt(self):
self.salt_manager.sync_with_cluster()
def get_deployment_id(self):
return self.deployment_id_manager.get_deployment_id()
def _get_visibility(self):
self.visibility = []
for name, field in VISIBILITY_FIELDS_BY_NAME.items():
if int(self.telemetry_conf_service.content.get(field) or 0):
self.visibility.append(name)
if not self.opt_in_is_up_to_date():
self.visibility = ['license'] if 'license' in self.visibility else []
self.visibility.sort()
def _nested_get(self, dic, path, default=0, separator='.'):
"""NestedGet.
default path separator is .
default value is 0
"""
keys = path.split(separator)
for key in keys[:-1]:
dic = dic.setdefault(key, {})
if type(dic) is dict:
return default
return dic.get(keys[-1])
def _load_json(self, endpoint, noProxy=False, default={}):
'''
calls endpoint['end_point'] and assigns the results to `self.profile[end_point['name']]`
:param endpoint:
:return:
'''
try:
path = self._construct_path(endpoint, noProxy)
payload = self.service.http.request(path,
{'method': 'GET',
'headers': self.service._auth_headers}).get('body')
if payload:
result = (spldata.load(payload.read()))
self.profile[endpoint['name']] = result['feed']
# often if license does not permit this call it will return a 402 as exception
except Exception:
self.profile[endpoint['name']] = default
return False
return True
def _construct_path(self, endpoint, noProxy):
path = self.service.authority \
+ self.service._abspath(endpoint["end_point"], owner=self.query_runner._splunkd.namespace['owner'],
app=self.query_runner._splunkd.namespace['app'])
if (noProxy):
path += "?noProxy=true"
return path
def get_instance_profile(splunkrc=None, telemetryConfService=None, serverInfoService=None):
get_instance_profile.instance = get_instance_profile.instance or InstanceProfile(
splunkrc, telemetryConfService, serverInfoService)
return get_instance_profile.instance
def evaluate_roles(roles, rules):
for reqi in rules:
if (reqi[0] == "!"):
reqi = reqi.replace("!", "")
if roles.get(reqi):
return False
elif not roles.get(reqi):
return False
return True
def is_lead_node(roles):
return 'lead_node' in roles and roles['lead_node'] is True
get_instance_profile.instance = None

@ -0,0 +1,151 @@
from __future__ import absolute_import
from past.builtins import basestring
from builtins import object
import json
import os
import uuid
from datetime import datetime
from splunk_instrumentation.metrics.metrics_transforms import transform_object, hash_specific_value_by_key
from splunk_instrumentation.constants import INST_EXECUTION_ID
from splunk_instrumentation.datetime_util import date_to_timestamp, utcNow, str_to_date
from splunk_instrumentation.report import report
from splunk_instrumentation.metrics.instance_profile import get_instance_profile, evaluate_roles
from splunk_instrumentation.salt_manager import SaltManager
from splunk_instrumentation.splunkd import Splunkd
from splunk_instrumentation.service_bundle import ServiceBundle
from splunk_instrumentation.constants import SPLUNKRC
class MetricsCollectionManager(object):
def __init__(self, metricSchema, dataPointFactory, splunkrc=None, phase=1):
self.metricSchema = metricSchema
self.dataPointFactory = dataPointFactory
self.splunkrc = splunkrc
self.profile = get_instance_profile()
self.phase = phase
splunkd = Splunkd(**SPLUNKRC)
services = ServiceBundle(splunkd)
salt_manager = SaltManager(services)
self.salt = salt_manager.get_salt()
self.scheme = {"hash": self.salt}
def collect_data(self, dateRange, callback=None):
'''
loads all data classes from schema and collects data for yesterday.
callback will be run after it collects data
'''
self._collect_classes_data(dateRange, callback)
def _collect_classes_data(self, dateRange, callback=None):
# get all MetricsSchemaClass (one savedsearch definition) from schema.json marked with 'self.phase'
classes = self.metricSchema.getEventClassByfield(self.phase, "phase", 2)
classes_batch = []
if os.environ.get('RUN_UNSCHEDULE') or dateRange.get("batchNum") is None:
classes_batch = classes
else:
# fetch all classes marked with current batch number in schema.json
current_batchNum = dateRange["batchNum"]
classes_batch = [clsDef for clsDef in classes if clsDef.batch == current_batchNum or clsDef.batch == '*']
for classDef in classes_batch:
rules = classDef.getRoles()
if evaluate_roles(self.profile.roles, rules):
self._collect_class_data(classDef, dateRange, callback)
def _collect_class_data(self, classDef, dateRange, callback=None):
'''
run data collections and call callbacks on it.
'''
try:
if not isinstance(dateRange, dict):
dateRange = {"start": dateRange}
dateRange["stop"] = dateRange.get("stop") or dateRange.get("start")
if isinstance(dateRange["start"], datetime) or isinstance(dateRange["stop"], datetime):
raise "Requires_date_not_datetime"
dataPoints = classDef.getDataPoints()
for dataPoint in dataPoints:
report.start_profiling()
dataPointResult = self.collect_data_point(dataPoint, dateRange)
if hasattr(dataPointResult, 'job'):
try:
report.report('components[]', {
"component": classDef.component,
"runDuration": float(dataPointResult.job["runDuration"]),
"scanCount": int(dataPointResult.job["scanCount"]),
"resultCount": int(dataPointResult.job["resultCount"]),
"isFailed": dataPointResult.job["isFailed"],
"searchProviders": len(dataPointResult.job["searchProviders"]),
"sid": dataPointResult.job["sid"]
})
except Exception:
report.report('components[]', {
"component": classDef.component,
"error": "could not log report"
})
dataPointResult = [
self.data_point_results_transform(classDef, event, dateRange) for event in dataPointResult]
callback(dataPointResult)
except Exception as e:
report.report('exceptions[]', str(e))
def collect_data_point(self, dataPoint, dateRange):
dataPointObj = self.dataPointFactory(dataPoint, options={"splunkrc": self.splunkrc})
data = dataPointObj.collect(dateRange)
return data
def data_point_results_transform(self, class_def, data_point_result, date_range):
fields = class_def.index_fields
hash_key = class_def.getHashKey()
result = {"data": None}
if data_point_result['data']:
if (isinstance(data_point_result['data'], basestring)):
data = json.loads(data_point_result['data'])
else:
data = data_point_result['data']
data = hash_specific_value_by_key(data=data, hash_key=hash_key, scheme=self.scheme)
data = transform_object(data=data, fields=fields)
result['data'] = data
result['timestamp'] = date_to_timestamp(utcNow())
result['component'] = class_def.component
result['date'] = date_range['stop'].isoformat()
data_point_time = data_point_result.get('_time')
if not date_range['stop'] == date_range['start']:
try:
if data_point_result.get('date'):
result['date'] = data_point_result.get('date')
elif data_point_time and 'T' in data_point_time:
result['date'] = data_point_result.get('_time').split('T')[0]
except Exception:
result['date'] = date_range['stop'].isoformat()
# SPECIAL CASE:
# At least one (the indexer cluster member count) data point is retrieved
# on the fly during data collection. Its time/date will be "today" though
# it really pertains to the stop date (typically yesterday in prod).
# Note - There is discussion of having each node persist this data so the
# true historical values can be retrieved later. That would eliminate
# this situation.
if str_to_date(result['date']) > date_range['stop']:
result['date'] = date_range['stop']
result['eventID'] = str(uuid.uuid4()).upper()
result['batchNum'] = date_range["batchNum"]
result['visibility'] = class_def.visibility or "anonymous"
result['executionID'] = INST_EXECUTION_ID
return result

@ -0,0 +1,111 @@
from builtins import map
import json
from splunk_instrumentation.constants import DEFAULT_BATCH_NUM
class MetricsSchemaDataPoint(object):
def __init__(self, dataPointSchema):
self.dataPointSchema = dataPointSchema
@property
def type(self):
return self.dataPointSchema['type']
class MetricsSchemaClass(object):
def __init__(self, classSchema):
self.classSchema = classSchema
@property
def component(self):
return self.classSchema['component']
@property
def visibility(self):
return self.classSchema.get('visibility')
@property
def batch(self):
classSchema_batch = self.classSchema.get('batch')
return self.classSchema.get('batch') or DEFAULT_BATCH_NUM
@property
def fields(self):
return self.classSchema.get('fields') or []
@property
def on_send(self):
return self.classSchema.get('on_send') or False
@property
def index_fields(self):
return self.classSchema.get('index_fields') or []
def getDataPoints(self):
dataPoints = []
for dataPoint in self.classSchema['dataPoints']:
dataPoints.append(MetricsSchemaDataPoint(dataPoint))
return dataPoints
def getHashKey(self):
hash_key = self.classSchema.get('hash_key')
if hash_key is None:
hash_key = []
elif not isinstance(hash_key, list):
hash_key = [hash_key]
return hash_key
def getRoles(self):
roles = self.classSchema.get('roles')
# default is to run telemetry data collection on lead node only
if roles is None:
roles = ['lead_node']
elif not isinstance(roles, list):
roles = [roles]
return roles
class MetricsDelivery(object):
def __init__(self, delivery):
self.delivery = delivery
@property
def url(self):
return self.delivery.get('url')
@url.setter
def url(self, url):
self.delivery['url'] = url
@property
def version(self):
return self.delivery.get('version')
class MetricsSchema(object):
def __init__(self, schema, visibility=None):
self.schema = schema
self.delivery = MetricsDelivery(schema['delivery'])
self.visibility = visibility
def getEventClasses(self):
classes = self.schema['classes']
return map(MetricsSchemaClass, classes)
def getEventClassByfield(self, value, fieldname="component", default=None):
classes = self.schema['classes']
result = []
for classDef in classes:
if classDef.setdefault(fieldname, default) == value and \
(self.visibility == '*' or
set(self.visibility).intersection(classDef['visibility'].split(','))):
result.append(MetricsSchemaClass(classDef))
return result
def load_schema(schema_file, visibility=None):
schema = None
with open(schema_file or "schema.json") as json_file:
schema = json.load(json_file)
return MetricsSchema(schema, visibility)

@ -0,0 +1,81 @@
import hashlib
import sys
metrics_transforms = {}
def transform_hash(data, scheme=None):
scheme = scheme or {"hash": "default"}
subject = str(data) + scheme.get('hash')
if sys.version_info >= (3, 0):
subject = subject.encode()
hash_object = hashlib.sha224(subject)
hex_dig = hash_object.hexdigest()
return hex_dig
metrics_transforms['sha256'] = transform_hash
metrics_transforms['hash'] = transform_hash
def hash_specific_value_helper(data, each_hash_key, scheme):
'''
recursively check hash keys in data
'''
for key in data:
if isinstance(data[key], dict):
hash_specific_value_helper(data[key], each_hash_key, scheme)
elif key == each_hash_key:
data[key] = transform_hash(data[key], scheme)
def hash_specific_value_by_key(data, hash_key, scheme):
'''
hash values corresponding to keys containing in hash_key
:param data:
:param hash_key: a list of keys got from schema
:param scheme: containing hash salt
:return:
'''
for key in hash_key:
hash_specific_value_helper(data, key, scheme)
return data
def metrics_transform(type, value, scheme=None):
if isinstance(value, list):
for idx, val in enumerate(value):
value[idx] = metrics_transform(type, val, scheme)
elif isinstance(value, dict):
for idx, val in value.items():
value[idx] = metrics_transform(type, val, scheme)
else:
value = metrics_transforms[type](value)
return value
def transform_object(data, fields):
def nested_set(dic, path, value):
keys = path.split(".")
for key in keys[:-1]:
dic = dic.setdefault(key, {})
dic[keys[-1]] = value
def nested_get(dic, path):
keys = path.split(".")
for key in keys[:-1]:
dic = dic.setdefault(key, {})
return dic[keys[-1]]
for field in fields:
field['path'] = field.get('path') or field.get('name')
field['search_path'] = field.get('search_path') or field['path']
field['set_path'] = field.get('set_path') or field['path']
if data.get(field['search_path']) is not None:
value = nested_get(data, field['search_path'])
if field.get('transform'):
value = metrics_transform(field['transform'], value)
nested_set(data, field['set_path'], value)
return data

@ -0,0 +1,257 @@
from splunk_instrumentation.indexing.instrumentation_index import InstrumentationIndex
from splunk_instrumentation.packager.send_log import SendLog
from splunk_instrumentation.packager.send_data import SendData
import random
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime, timedelta, time
from splunk_instrumentation.datetime_util import local, utc, utcNow, json_serial
from splunk_instrumentation.metrics.instance_profile import get_instance_profile
import splunk_instrumentation.metrics.metrics_schema as metrics_schema
from splunk_instrumentation.constants import INST_SCHEMA_FILE, INST_EXECUTION_START_TIME
from splunk_instrumentation.packager.quick_draw import get_quick_draw
from splunk_instrumentation.report import report
from splunk_instrumentation.metrics.metrics_transforms import transform_object
from splunk_instrumentation.dataPoints.data_point import dataPointFactory
from splunk_instrumentation.dataPoints.spl_data_point import SPLDataPoint # noqa
from splunk_instrumentation.dataPoints.report_data_point import ReportDataPoint # noqa
from splunk_instrumentation.splunklib import binding
from splunk_instrumentation.constants import INTROSPECTION_INDEX_NAME
import os
import json
logger = logging.getLogger(__name__)
dataLogger = logging.getLogger('TelemetryCloudData')
dataLogger.setLevel(logging.INFO)
handler = RotatingFileHandler(os.path.join(os.environ.get('SPLUNK_HOME'), 'var',
'log', 'splunk', 'splunk_instrumentation_cloud.log'),
mode='a', maxBytes=5000000, backupCount=5)
formatter = logging.Formatter('{"datetime": "%(asctime)s", "log_level": "%(levelname)s", '
'"component": "%(name)s", "data": %(message)s}')
handler.setFormatter(formatter)
dataLogger.addHandler(handler)
class Packager(object):
""" Packager Class.
This class acts as the gateaway for the data.
"""
def __init__(
self, splunkrc=None, deploymentID=None, schema=None, factory=None,
send_data=None, send_log=None, instance_profile=None,
quick_draw=None):
self._splunkrc = splunkrc
self.deploymentID = deploymentID
self.transaction_id = None
if not instance_profile:
self.instance_profile = get_instance_profile(self._splunkrc)
else:
self.instance_profile = instance_profile
if not schema:
schema = metrics_schema.load_schema(INST_SCHEMA_FILE, visibility=self.instance_profile.visibility)
self.schema = schema
if not factory:
factory = dataPointFactory
self.factory = factory
self.deliverySchema = self.schema.delivery
if not quick_draw:
qd = get_quick_draw()
else:
qd = quick_draw
if qd:
self.deliverySchema.url = qd.get('url')
self.transaction_id = self.get_transactionID()
if not send_data:
self.sd = SendData(
deploymentID=self.instance_profile.get_deployment_id(),
deliverySchema=self.deliverySchema,
transaction_id=self.get_transactionID())
else:
self.sd = send_data
if not send_log:
self.sl = SendLog(splunkrc=self._splunkrc)
else:
self.sl = send_log
self.result = None
self.is_cloud = self.instance_profile.server_is_cloud
def package_send(self, dateRange, index_name=INTROSPECTION_INDEX_NAME):
"""Auto send and log data.
First we look at our index and check the start, stop, and visibility
Next we query based on that, and send it.
"""
visibility = self.instance_profile.visibility
if visibility is False:
return False
time_range = {"start": INST_EXECUTION_START_TIME, "stop": utcNow()}
events = self._query_events(
dateRange['start'], dateRange['stop'], dateRange['batchNum'], visibility, False, time_range=time_range, index_name=index_name)
if len(events) == 0:
report.report('send-canceled', True)
return False
return self._send_package(events, dateRange['start'], dateRange['stop'], dateRange['batchNum'], time_range=time_range)
def manual_send_package(self, events, start, stop, visibility):
"""Handling manually sending package from the UI.
This is just a wrapper for _send_package
events = events from index
start = from datetime picker
stop = from datetime picker
visibility = [anonymous, license]
"""
time_range = {
"start": datetime.combine(start, time.min).replace(tzinfo=local).astimezone(utc),
"stop": datetime.combine(stop + timedelta(days=1), time.max).replace(tzinfo=local).astimezone(utc)
}
return self._send_package(
events, start, stop, method='manual', visibility=visibility, time_range=time_range)
def build_package(self, start, stop, visiblity, forExport=False):
return self._query_events(
start, stop, visiblity, forExport)
def get_transactionID(self):
if self.transaction_id:
return self.transaction_id
allowedCharacters = '0123456789ABCDEF'
transaction_id = ''.join(random.choice(allowedCharacters) for i in range(8)) + '-' + ''.join(
random.choice(allowedCharacters) for i in range(4)) + \
'-' + ''.join(random.choice(allowedCharacters) for i in range(4)) + \
'-' + ''.join(random.choice(allowedCharacters) for i in range(4)) + \
'-' + ''.join(random.choice(allowedCharacters) for i in range(12))
self.transaction_id = transaction_id
return self.transaction_id
def _get_visibility(self, events):
result = {}
i = get_instance_profile(self._splunkrc)
visibility = i.visibility
for event in events:
vis = event.get('visibility') or []
for key in vis.split(','):
if key in visibility:
result[key] = True
return sorted(result.keys())
def _query_events(
self, start, stop, batchNum,
visibility=[], forExport=False, time_range=None, index_name=INTROSPECTION_INDEX_NAME):
'''
:param start: datetime.date
:param stop: datetime.date can be the same as start
:param batchNum: batch number
:param visibility:
:param forExport: true if this is for export and forces visibility values to visibility field on events
:param time_range: {start,stop} the timecode range to limit event _time
:param index_name: specifies which index to query for telemetry events (default: _introspection)
:return:
'''
if isinstance(start, datetime) or isinstance(stop, datetime):
raise ("Requires_date_not_datetime")
i = InstrumentationIndex(splunkrc=self._splunkrc, index_name=index_name)
result = []
def process_events(events):
for data in events:
self._transform_data(data)
result.append(data)
profile = report.start_profiling()
i.process_new_events(
start, stop, batchNum, process_events, visibility=visibility, time_range=time_range)
report.report("query_telemetry", {"count": len(result)}, profile)
if forExport:
result = self._mark_visibility(result, visibility, 'manual')
return result
def _transform_data(self, data):
classDef = self.schema.getEventClassByfield(data['component'])
if (len(classDef)):
data['data'] = transform_object(data['data'], classDef[0].fields)
return data
def _send_package(
self, events, start, stop, batchNum, method='auto', visibility=None, time_range=None):
"""Sending package and log it.
If offline (or quickdraw not available), log failed to the index.
If on cloud, log events to splunk_instrumentation_cloud.log, instead of sending to quickdraw
events = events from index
start = from datetime picker
stop = from datetime picker
batchNum = batch number
method = ['auto', 'manual']
visibility = [anonymous, license]
"""
visibility = visibility or self._get_visibility(events)
count = len(events)
self.sl.send_attempted(start, stop, batchNum, visibility=visibility, time_range=time_range, method=method, count=count)
try:
events = self._mark_visibility(events, visibility, method)
if self.is_cloud:
self.sd.bundle_DTOs(events)
for event in events:
dataLogger.info(json.dumps(event, default=json_serial))
else:
if self.deliverySchema.url:
self.sd.send_data(events)
else:
raise Exception('Quickdraw is not available')
self.sl.send_completed(start, stop, batchNum, visibility=visibility, time_range=time_range, method=method,
count=count)
except binding.HTTPError as e:
logger.error(e)
self.sl.send_failed(start, stop, batchNum, visibility=visibility, time_range=time_range, method=method, count=None)
report.report("send_failed", True)
raise
except Exception as e:
logger.error(e)
self.sl.send_failed(start, stop, batchNum, visibility=visibility, time_range=time_range, method=method, count=None)
report.report("send_failed", True)
raise
except Exception:
logger.error("Unknown Error")
self.sl.send_failed(start, stop, batchNum, visibility=visibility, time_range=time_range, method=method, count=None)
report.report("send_failed", True)
raise
def _mark_visibility(self, events, visibility, method='auto'):
"""Marking visibility.
It alters the visibility field according to their choice from the UI
events = events from index
visibility = [anonymous, license] from UI
"""
if method == 'manual':
for event in events:
event['visibility'] = ','.join(visibility)
elif method == 'auto':
for event in events:
temp = []
for vis in event['visibility'].split(','):
if vis in visibility:
temp.append(vis)
event['visibility'] = ','.join(temp)
else:
raise Exception("Should never reach this.")
return events

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save