You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
168 lines
6.3 KiB
168 lines
6.3 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
|
|
import sys
|
|
|
|
import time as time_import
|
|
|
|
import splunk
|
|
|
|
from splunk.clilib.bundle_paths import make_splunkhome_path
|
|
|
|
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib', 'SA_ITOA_app_common']))
|
|
|
|
from itsi.itsi_utils import ITOAInterfaceUtils
|
|
from SA_ITOA_app_common.solnlib.modular_input import event_writer
|
|
from ITOA.itoa_common import is_feature_enabled, get_hec_token, get_hec_uri
|
|
from ITOA.setup_logging import logger
|
|
from .scs.constants import RE_QUEUE_MODE_FEATURE_FLAG
|
|
|
|
|
|
class PushEventManager(object):
|
|
"""
|
|
Push Event to any index using http listener. As part of instance creation
|
|
it enable Http Listener if it is disabled and acquire token for given name and settings
|
|
Use push_event function to push event to given index
|
|
"""
|
|
|
|
def __init__(self, session_key, token_name, hec_token=None, hec_uri=None):
|
|
"""
|
|
Initialize token settings
|
|
|
|
@type token_name: basestring
|
|
@param token_name: token_name
|
|
|
|
@type hec_token: basestring
|
|
@param hec_token: the hec token to use to communicate with HEC, if unprovided it will be fetched via REST
|
|
|
|
@type hec_uri: basestring
|
|
@param hec_uri: the uri to use to communicate with HEC, if unprovided it will be fetched via REST
|
|
|
|
@rtype: object
|
|
@return:
|
|
"""
|
|
if not hec_uri:
|
|
hec_uri = get_hec_uri(session_key)
|
|
if not hec_token:
|
|
hec_token = get_hec_token(session_key, token_name)
|
|
splunkd_scheme = splunk.getDefault('protocol')
|
|
splunkd_host, splunkd_port = ITOAInterfaceUtils.get_splunk_host_port()
|
|
# Passing in the hec and splunkd information prevents the doing of unncessary requests/subprocesses
|
|
self.push_event_object = event_writer.HECEventWriter(token_name, session_key, scheme=splunkd_scheme,
|
|
host=splunkd_host, port=splunkd_port, hec_uri=hec_uri,
|
|
hec_token=hec_token)
|
|
self.is_queue_mode_enabled = is_feature_enabled(RE_QUEUE_MODE_FEATURE_FLAG, session_key)
|
|
|
|
def get_events_to_push(self, events, time, source, sourcetype, host, index):
|
|
"""
|
|
Given a list of raw events and indexing specific details, return a list
|
|
that can further be fed to the HTTP Event Collector.
|
|
See: http://dev.splunk.com/view/event-collector/SP-CAAAE6M
|
|
|
|
@type self: PushEventManager
|
|
|
|
@param events: list of raw event dictionaries
|
|
@type events: list of dict
|
|
@param time: event time
|
|
@param source: event source
|
|
@param sourcetype: event sourcetype
|
|
@param host: event host
|
|
@param index: index to write to
|
|
|
|
@return a dictionary that the HEC understands
|
|
"""
|
|
if not isinstance(events, list):
|
|
raise TypeError('`events` is not a valid list. Received type=`%s`.' % type(events).__name__)
|
|
|
|
# remove keys pre-hec. w/o this we generate multi-valued fields for keys
|
|
# which craps out the rules engine grouping algorithm.
|
|
|
|
prepped = []
|
|
pop_keys = ('source', 'sourcetype', '_time', 'host', 'index')
|
|
for ev in events:
|
|
host = ev.pop('event_host', None) if ev.get('event_host', None) else host
|
|
source = ev.pop('event_source', None) if ev.get('event_source', None) else source
|
|
ev_time = ev.get('_time', time)
|
|
if not ev_time:
|
|
ev_time = time_import.time()
|
|
for k in pop_keys:
|
|
ev.pop(k, None)
|
|
prepped.append(self.push_event_object.create_event(ev, float(ev_time), index, host, source, sourcetype))
|
|
return prepped
|
|
|
|
def push_event(self, event, time=None, source=None, sourcetype=None, host=None, index=None):
|
|
"""
|
|
Push event to index
|
|
|
|
@type self: PushEventManager
|
|
|
|
@type event: dict
|
|
@param event: even to push
|
|
|
|
ideally consumer of this function should pass event
|
|
with time, host, source and sourcetype. Beside this 'event'
|
|
must in the json form
|
|
for example:
|
|
{
|
|
time: <epoch time>,
|
|
host: <host>, source:<source>,
|
|
sourcetype:<sourcetype>,
|
|
event: <json event>
|
|
}
|
|
See:
|
|
http://dev.splunk.com/view/event-collector/SP-CAAAE6M
|
|
|
|
NOTE:
|
|
++++++++++++++
|
|
if host, source or sourcetype is not assigned then default values is being used which is
|
|
assigned to token
|
|
if time is not specified then index time would be event time
|
|
++++++++++++++
|
|
|
|
@param time: event time
|
|
@param source: event source
|
|
@param sourcetype: event sourcetype
|
|
@param host: event host
|
|
@param index: index to write to
|
|
@return: tuple (response, content)
|
|
"""
|
|
events_to_push = self.get_events_to_push([event], time, source, sourcetype, host, index)
|
|
return self.push_event_object.write_events(events_to_push)
|
|
|
|
def push_events(self, events, source=None, sourcetype=None, host=None, index=None):
|
|
"""
|
|
Push events to index.
|
|
|
|
@type self: PushEventManager
|
|
|
|
@type events: list
|
|
@param events: events to push
|
|
|
|
ideally consumer of this function should pass each event
|
|
with time, host, source and sourcetype. Beside this 'event'
|
|
must in the json form
|
|
for example:
|
|
{
|
|
time: <epoch time>,
|
|
host: <host>, source:<source>,
|
|
sourcetype:<sourcetype>,
|
|
event: <json event>
|
|
}
|
|
See:
|
|
http://dev.splunk.com/view/event-collector/SP-CAAAE6M
|
|
|
|
NOTE:
|
|
++++++++++++++
|
|
- if host, source or sourcetype is not assigned then default
|
|
values is being used which is assigned to token
|
|
- We expect each event to have its own _time set here.
|
|
++++++++++++++
|
|
|
|
@param source: event source
|
|
@param sourcetype: event sourcetype
|
|
@param host: event host
|
|
@param index: index to write to
|
|
@return: tuple (response, content)
|
|
"""
|
|
events_to_push = self.get_events_to_push(events, None, source, sourcetype, host, index)
|
|
return self.push_event_object.write_events(events_to_push)
|