You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

168 lines
6.3 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import sys
import time as time_import
import splunk
from splunk.clilib.bundle_paths import make_splunkhome_path
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib', 'SA_ITOA_app_common']))
from itsi.itsi_utils import ITOAInterfaceUtils
from SA_ITOA_app_common.solnlib.modular_input import event_writer
from ITOA.itoa_common import is_feature_enabled, get_hec_token, get_hec_uri
from ITOA.setup_logging import logger
from .scs.constants import RE_QUEUE_MODE_FEATURE_FLAG
class PushEventManager(object):
"""
Push Event to any index using http listener. As part of instance creation
it enable Http Listener if it is disabled and acquire token for given name and settings
Use push_event function to push event to given index
"""
def __init__(self, session_key, token_name, hec_token=None, hec_uri=None):
"""
Initialize token settings
@type token_name: basestring
@param token_name: token_name
@type hec_token: basestring
@param hec_token: the hec token to use to communicate with HEC, if unprovided it will be fetched via REST
@type hec_uri: basestring
@param hec_uri: the uri to use to communicate with HEC, if unprovided it will be fetched via REST
@rtype: object
@return:
"""
if not hec_uri:
hec_uri = get_hec_uri(session_key)
if not hec_token:
hec_token = get_hec_token(session_key, token_name)
splunkd_scheme = splunk.getDefault('protocol')
splunkd_host, splunkd_port = ITOAInterfaceUtils.get_splunk_host_port()
# Passing in the hec and splunkd information prevents the doing of unncessary requests/subprocesses
self.push_event_object = event_writer.HECEventWriter(token_name, session_key, scheme=splunkd_scheme,
host=splunkd_host, port=splunkd_port, hec_uri=hec_uri,
hec_token=hec_token)
self.is_queue_mode_enabled = is_feature_enabled(RE_QUEUE_MODE_FEATURE_FLAG, session_key)
def get_events_to_push(self, events, time, source, sourcetype, host, index):
"""
Given a list of raw events and indexing specific details, return a list
that can further be fed to the HTTP Event Collector.
See: http://dev.splunk.com/view/event-collector/SP-CAAAE6M
@type self: PushEventManager
@param events: list of raw event dictionaries
@type events: list of dict
@param time: event time
@param source: event source
@param sourcetype: event sourcetype
@param host: event host
@param index: index to write to
@return a dictionary that the HEC understands
"""
if not isinstance(events, list):
raise TypeError('`events` is not a valid list. Received type=`%s`.' % type(events).__name__)
# remove keys pre-hec. w/o this we generate multi-valued fields for keys
# which craps out the rules engine grouping algorithm.
prepped = []
pop_keys = ('source', 'sourcetype', '_time', 'host', 'index')
for ev in events:
host = ev.pop('event_host', None) if ev.get('event_host', None) else host
source = ev.pop('event_source', None) if ev.get('event_source', None) else source
ev_time = ev.get('_time', time)
if not ev_time:
ev_time = time_import.time()
for k in pop_keys:
ev.pop(k, None)
prepped.append(self.push_event_object.create_event(ev, float(ev_time), index, host, source, sourcetype))
return prepped
def push_event(self, event, time=None, source=None, sourcetype=None, host=None, index=None):
"""
Push event to index
@type self: PushEventManager
@type event: dict
@param event: even to push
ideally consumer of this function should pass event
with time, host, source and sourcetype. Beside this 'event'
must in the json form
for example:
{
time: <epoch time>,
host: <host>, source:<source>,
sourcetype:<sourcetype>,
event: <json event>
}
See:
http://dev.splunk.com/view/event-collector/SP-CAAAE6M
NOTE:
++++++++++++++
if host, source or sourcetype is not assigned then default values is being used which is
assigned to token
if time is not specified then index time would be event time
++++++++++++++
@param time: event time
@param source: event source
@param sourcetype: event sourcetype
@param host: event host
@param index: index to write to
@return: tuple (response, content)
"""
events_to_push = self.get_events_to_push([event], time, source, sourcetype, host, index)
return self.push_event_object.write_events(events_to_push)
def push_events(self, events, source=None, sourcetype=None, host=None, index=None):
"""
Push events to index.
@type self: PushEventManager
@type events: list
@param events: events to push
ideally consumer of this function should pass each event
with time, host, source and sourcetype. Beside this 'event'
must in the json form
for example:
{
time: <epoch time>,
host: <host>, source:<source>,
sourcetype:<sourcetype>,
event: <json event>
}
See:
http://dev.splunk.com/view/event-collector/SP-CAAAE6M
NOTE:
++++++++++++++
- if host, source or sourcetype is not assigned then default
values is being used which is assigned to token
- We expect each event to have its own _time set here.
++++++++++++++
@param source: event source
@param sourcetype: event sourcetype
@param host: event host
@param index: index to write to
@return: tuple (response, content)
"""
events_to_push = self.get_events_to_push(events, None, source, sourcetype, host, index)
return self.push_event_object.write_events(events_to_push)