You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
588 lines
20 KiB
588 lines
20 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
'''
|
|
This module handles getting results in and out of the splunk search protocol
|
|
For most things, you'll want read_chunk and write_chunk
|
|
'''
|
|
import sys
|
|
import re
|
|
import json
|
|
import abc
|
|
import time
|
|
import os
|
|
import csv
|
|
import splunk.rest as rest
|
|
|
|
from queue import Queue, Empty
|
|
from threading import Thread
|
|
from .setup_logging import ItsiLogger, SEARCH_LOG_FILE, getLogger
|
|
from ITOA.itoa_common import get_csv_dict_writer
|
|
|
|
"""
|
|
If you are reading or writing binary data, such as an image, under Windows,
|
|
the file must be opened in binary mode.
|
|
But unix does not make a distinction between text and binary modes
|
|
"""
|
|
if sys.platform == "win32":
|
|
import msvcrt
|
|
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
|
|
msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
|
|
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
|
|
|
|
|
|
# The conf customsearch stanza
|
|
CONF_CUSTOMSEARCH_STANZA = 'customsearch'
|
|
|
|
# The conf customsearch timeout read
|
|
CONF_CUSTOMSEARCH_SETTING_TIMEOUT_READ = 'timeout_read'
|
|
|
|
# The default customsearch read timeout (in seconds)
|
|
CONF_CUSTOMSEARCH_SETTING_TIMEOUT_READ_DEFAULT = 60
|
|
|
|
|
|
def get_itsi_conf_setting(session_key, stanza, setting, logger):
|
|
"""
|
|
Reads and returns the given conf setting.
|
|
|
|
@param session_key: the session key
|
|
@type session_key: str
|
|
@param stanza: the conf stanza
|
|
@type stanza: str
|
|
@param setting: the conf setting
|
|
@type setting: str
|
|
@returns: the conf setting
|
|
@rtype: any
|
|
"""
|
|
try:
|
|
response, content = rest.simpleRequest(
|
|
'/servicesNS/nobody/SA-ITOA/configs/conf-itsi_settings/' + stanza,
|
|
sessionKey=session_key,
|
|
getargs={ 'output_mode': 'json' }
|
|
)
|
|
if response.status == 200:
|
|
entries = json.loads(content).get('entry')
|
|
for entry in entries:
|
|
name = entry.get('name')
|
|
if name != stanza:
|
|
continue
|
|
settings = entry.get('content', {})
|
|
return settings.get(setting)
|
|
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
|
|
return None
|
|
|
|
|
|
class SearchChunkProtocol(metaclass=abc.ABCMeta):
|
|
"""
|
|
New Search Chunk Protocol 1.0 follows the below steps
|
|
|
|
1. splunk send action type "getinfo" with header which hold meta information
|
|
2. Command need to send back method with information like type of search
|
|
3. Once initial hand shake is done, Splunk send data in chunks, search command process it and
|
|
send data back to splunk with {"finished":false}. This can be infinite times
|
|
4. When search is done, it need to send { "finished": true } to splunk to finalize the search
|
|
"""
|
|
|
|
stdin = sys.stdin
|
|
stdout = sys.stdout
|
|
stderr = sys.stderr
|
|
|
|
def __init__(self, output_meta_data, is_send_ack=True, logger=None):
|
|
"""
|
|
Initialize. It negotiate getinfo exchange fully and defined search command argument and session key
|
|
self.session_key
|
|
self.args
|
|
|
|
@type output_meta_data: dict
|
|
@param output_meta_data: output meta data which need to be send during getinfo exchange phase
|
|
this dict must contain type ( possible values of type would be ['streaming', 'stateful', 'events',
|
|
'reporting']
|
|
|
|
@type is_send_ack: bool
|
|
@param is_send_ack: set this flag to avoid sending write back for get info handshake, if this is false, then
|
|
write_chunk need to be called to complete handshake
|
|
|
|
@type logger: object
|
|
@param logger: if logger specified then overwrite the class logger
|
|
@return:
|
|
"""
|
|
if not (isinstance(output_meta_data, dict) and 'type' in list(output_meta_data.keys())):
|
|
raise AttributeError("Type is not defined in out_meta_data to do get-exchange phase.")
|
|
|
|
if output_meta_data.get('type') not in ['streaming', 'stateful', 'events', 'reporting']:
|
|
raise ValueError('Invalid type value')
|
|
|
|
self.logger = logger if logger else getLogger()
|
|
|
|
self.timeout_read = CONF_CUSTOMSEARCH_SETTING_TIMEOUT_READ_DEFAULT
|
|
|
|
# Phase 0 getinfo exchange phase
|
|
ret = self.read_getinfo(sys.stdin)
|
|
if not ret:
|
|
raise ValueError("Getinfo exchanges does not contain meta data")
|
|
|
|
metadata, body = ret
|
|
|
|
# Store earliest time
|
|
# let's try to get it at the top level, but I don't think this is ever there...
|
|
earliest_time = metadata.get('earliest_time')
|
|
|
|
if metadata.get('searchinfo'):
|
|
searchinfo = metadata.get('searchinfo', {})
|
|
if not earliest_time:
|
|
earliest_time = searchinfo.get('earliest_time')
|
|
self.session_key = searchinfo.get('session_key')
|
|
self.logger = ItsiLogger(os.path.join(searchinfo.get('dispatch_dir'), SEARCH_LOG_FILE),
|
|
searchinfo.get('command'))
|
|
self.args = {}
|
|
args = metadata.get('searchinfo', {}).get('args')
|
|
# Convert Array to key value
|
|
for arg in args:
|
|
if arg.find("=") != -1:
|
|
key, value = arg.split("=")
|
|
self.logger.debug("Adding search command argument key=%s, value=%s", key, value)
|
|
self.args[key] = value
|
|
else:
|
|
self.logger.warning("Invalid argument, arg=%s", arg)
|
|
|
|
# The timeout (in seconds) when attempting to read a chunk
|
|
self.timeout_read = self.fetch_setting_timeout_read()
|
|
|
|
if not earliest_time:
|
|
self.logger.warning("Earliest time (earliest_time) is undefined or zero")
|
|
earliest_time = time.time()
|
|
self.earliest_time = float(earliest_time)
|
|
# Send output
|
|
# Make sure finished is defined
|
|
if 'finished' not in list(output_meta_data.keys()):
|
|
output_meta_data.setdefault('finished', False)
|
|
|
|
# Class validate function to validate search command arguments, if it fails, it should return message
|
|
is_valid, inspector_msgs = self.validate_search_args()
|
|
|
|
if not is_valid:
|
|
if isinstance(inspector_msgs, list):
|
|
self.exit_with_error(output_meta_data, inspector_msgs)
|
|
else:
|
|
self.exit_with_error(output_meta_data, "Invalid arguments in search command")
|
|
else:
|
|
if is_send_ack:
|
|
self.write_chunk(output_meta_data, '')
|
|
|
|
self.writer = None
|
|
self.output_buf = None
|
|
self.logger.debug('Search Chunk Protocol has initialized successfully.')
|
|
|
|
def fetch_setting_timeout_read(self):
|
|
"""
|
|
Fetches and returns the timeout conf setting that's used when reading a chunk
|
|
|
|
@type: int
|
|
@return: return the timeout value
|
|
"""
|
|
try:
|
|
value = int(get_itsi_conf_setting(self.session_key, CONF_CUSTOMSEARCH_STANZA, CONF_CUSTOMSEARCH_SETTING_TIMEOUT_READ, logger=self.logger))
|
|
except Exception:
|
|
value = CONF_CUSTOMSEARCH_SETTING_TIMEOUT_READ_DEFAULT
|
|
return value
|
|
|
|
def get_session_key(self):
|
|
"""
|
|
Return session key
|
|
|
|
@type: basestring
|
|
@return: return splunkd session key
|
|
"""
|
|
return self.session_key
|
|
|
|
def get_search_args(self):
|
|
"""
|
|
Get search command argument
|
|
|
|
@type: basestring
|
|
@return: return search command parameters
|
|
"""
|
|
return self.args
|
|
|
|
def read_getinfo(self, fp=None):
|
|
"""
|
|
Read action:getinfo which is send by splunk as first chunk to external commands
|
|
@return:
|
|
"""
|
|
if fp is None:
|
|
fp = self.stdin
|
|
|
|
ret = self.read_chunk(fp)
|
|
if not ret:
|
|
return None
|
|
metadata, body = ret
|
|
return metadata, body
|
|
|
|
def read_chunk(self, fp=None):
|
|
"""
|
|
Read chunk and parse it
|
|
|
|
@type fp: object
|
|
@param fp: file descriptor to read the data from
|
|
|
|
@rtype tuple
|
|
@return: a tuple of metadata and body otherwise log exception and send None
|
|
"""
|
|
worker_results = Queue()
|
|
|
|
worker = Thread(target=self._thread_read_chunk, args=(worker_results, fp))
|
|
worker.daemon = True
|
|
worker.start()
|
|
|
|
# Just let the exception occur if this fails, since the caller of this function might not handle any invalid/error return value properly
|
|
chunk = worker_results.get(block=True, timeout=self.timeout_read)
|
|
|
|
return chunk
|
|
|
|
def _thread_read_chunk(self, results_queue, fp=None):
|
|
"""
|
|
Worker routine for reading chunks
|
|
@type results_queue: Queue
|
|
@param results_queue: queue to place results in
|
|
@type fp: object
|
|
@param fp: file descriptor to read the data from
|
|
"""
|
|
chunk = self._read_chunk(fp)
|
|
results_queue.put(chunk)
|
|
|
|
def _read_chunk(self, fp=None):
|
|
"""
|
|
Read chunk and parse it
|
|
|
|
@type fp: object
|
|
@param fp: file descriptor to read the data from
|
|
|
|
@rtype tuple
|
|
@return: a tuple of metadata and body otherwise log exception and send None
|
|
"""
|
|
if fp is None:
|
|
fp = self.stdin
|
|
|
|
# When search does not have any data like index=abc (abc index does not present)
|
|
# in that case, we can't read anything from stdin so we wait until Splunk write
|
|
# something in the pipe. Splunk will terminate if it does not write anything in the pipe
|
|
|
|
# Because EOF and invalid index both return empty string hence using timeout to avoid infinite loop
|
|
currentTime = time.time()
|
|
|
|
header_chars = []
|
|
|
|
buf = fp
|
|
if sys.version_info >= (3, 0) and hasattr(fp, 'buffer'):
|
|
buf = fp.buffer
|
|
|
|
while True:
|
|
try:
|
|
while not fp.closed and time.time() - currentTime < self.timeout_read:
|
|
c = buf.read(1).decode()
|
|
header_chars.append(c)
|
|
if c == '\n':
|
|
break
|
|
except Exception as e:
|
|
self.logger.exception(e)
|
|
return None
|
|
|
|
header = ''.join(header_chars)
|
|
self.logger.debug("Header='%s'", header)
|
|
if not header or len(header) == 0:
|
|
# If time out log and bail out
|
|
if time.time() - currentTime >= self.timeout_read:
|
|
self.logger.error("Timeout while reading header of chunk command")
|
|
return None
|
|
else:
|
|
continue
|
|
else:
|
|
break
|
|
|
|
m = re.match('chunked\s+1.0\s*,\s*(?P<metadata_length>\d+)\s*,\s*(?P<body_length>\d+)\s*\n', header) # noqa W605
|
|
if m is None:
|
|
self.logger.error('Failed to parse transport header: %s', header)
|
|
return None
|
|
|
|
try:
|
|
metadata_length = int(m.group('metadata_length'))
|
|
body_length = int(m.group('body_length'))
|
|
except Exception as e:
|
|
self.logger.exception(e)
|
|
self.logger.error('Failed to parse metadata or body length')
|
|
return None
|
|
|
|
self.logger.debug('READING CHUNK %d %d', metadata_length, body_length)
|
|
|
|
try:
|
|
metadata_buf = buf.read(metadata_length)
|
|
body = buf.read(body_length)
|
|
if sys.version_info >= (3, 0):
|
|
body = body.decode()
|
|
except Exception as e:
|
|
self.logger.exception(e)
|
|
self.logger.error('Failed to read metadata or body: %s' % str(e))
|
|
return None
|
|
|
|
try:
|
|
metadata = json.loads(metadata_buf)
|
|
except Exception as e:
|
|
self.logger.exception(e)
|
|
self.logger.error('Failed to parse metadata JSON')
|
|
return None
|
|
|
|
return metadata, body
|
|
|
|
def write_chunk(self, metadata, body, fp=None):
|
|
"""
|
|
Send data to splunk
|
|
@type: fp: object
|
|
@param fp: file descriptor
|
|
|
|
@type metadata: dict
|
|
@param metadata: metadata which needed to be add
|
|
|
|
@type body: basestring
|
|
@param body: data to send to splunk. If it is None then it set to a empty string
|
|
|
|
@return: None
|
|
"""
|
|
self.logger.debug("Started writing chunk metadata=%s, body=%s", metadata, body)
|
|
|
|
fs = FileStringHandler.getInstance()
|
|
fp = fs.get_writer(fp)
|
|
|
|
if body is None:
|
|
body = ''
|
|
encoded_body = fs.encode_string(body)
|
|
metadata_buf = None
|
|
if metadata:
|
|
metadata_buf = fs.encode_string(json.dumps(metadata))
|
|
fp.write(fs.encode_string('chunked 1.0,%d,%d\n' % (len(metadata_buf) if metadata_buf else 0, len(encoded_body))))
|
|
if metadata:
|
|
fp.write(metadata_buf)
|
|
fp.write(encoded_body)
|
|
fp.flush()
|
|
self.logger.debug('Successfully write chunk.')
|
|
|
|
def add_inspector_msg(self, metadata, level, msg):
|
|
"""
|
|
Add inspector message to meta data to show up on splunkd. If error is ERROR level then it is shown as error
|
|
message of the command
|
|
|
|
@type metadata: dict
|
|
@param metadata: meta data of chunk write protocol if it is not defined then set as empty dict
|
|
|
|
@type level - basestring (INFO|DEBUG|ERROR...)
|
|
@param level: message level
|
|
|
|
@type msg: basestring
|
|
@param msg: message
|
|
|
|
@rtype dict
|
|
@return: return metadata back
|
|
"""
|
|
if metadata is None:
|
|
metadata = {}
|
|
|
|
inspector = metadata.setdefault('inspector', {})
|
|
msgs = inspector.setdefault('messages', [])
|
|
if level is not None and msg is not None:
|
|
self.logger.debug('inspector message level=%s, message=%s', level, msg)
|
|
msgs.append([level, msg])
|
|
return metadata
|
|
|
|
def exit_with_error(self, metadata, msgs):
|
|
"""
|
|
Exit command with error message
|
|
@type metadata: dict
|
|
@param metadata: metadata
|
|
|
|
@type msgs: list
|
|
@param msgs: message name
|
|
|
|
@return: None
|
|
"""
|
|
if metadata is None:
|
|
metadata = {}
|
|
metadata['finished'] = True
|
|
for msg in msgs:
|
|
metadata = self.add_inspector_msg(metadata, 'ERROR', msg)
|
|
self.write_chunk(metadata, '')
|
|
self.logger.error('Existing external command because of error=%s', msg)
|
|
sys.exit(1)
|
|
|
|
def get_string_buffer(self):
|
|
"""
|
|
Instead of dealing with a string, you should use StringIO buffer which increases
|
|
performance by a factor of two.
|
|
|
|
Return StringIO buffer object
|
|
|
|
@rtype: StringIO
|
|
@return: object
|
|
"""
|
|
import sys
|
|
if sys.version_info >= (3, 0):
|
|
from io import StringIO
|
|
else:
|
|
from StringIO import StringIO
|
|
return StringIO()
|
|
|
|
def validate_search_args(self):
|
|
"""
|
|
(Optional) overwrite
|
|
|
|
This function needs to be over written if there is any additional check requires for search commands
|
|
parameters. This function is invoke during phase 0 getinfo exchange (__init__ function)
|
|
|
|
@rtype: tuple
|
|
@return: Tuple of flag and array of error message
|
|
"""
|
|
return True, []
|
|
|
|
def pre_processing(self):
|
|
"""
|
|
(Optional) overwrite
|
|
This function should be overwritten if you want to perform some operation before it starts reading data from
|
|
splunk after getinfo exchange
|
|
@return:
|
|
"""
|
|
pass
|
|
|
|
def post_processing(self):
|
|
"""
|
|
(optional) overwrite
|
|
This function should be overwritten if you want to perform some operation after it sends all data from
|
|
splunk after getinfo exchange
|
|
@return:
|
|
"""
|
|
pass
|
|
|
|
def execute(self):
|
|
"""
|
|
Main function where we start reading data from splunkd
|
|
@return:
|
|
"""
|
|
self.pre_processing()
|
|
self.logger.debug("Start reading chunk data...")
|
|
finished = False
|
|
# Count chunk for no data case
|
|
chunk = 0
|
|
|
|
while not finished:
|
|
ret = self.read_chunk(sys.stdin)
|
|
|
|
self.logger.debug("Read data=%s", ret)
|
|
# break when no more data
|
|
if not ret:
|
|
break
|
|
metadata, body = ret
|
|
|
|
finished = metadata.get('finished', False)
|
|
|
|
reader = csv.DictReader(body.splitlines())
|
|
self.output_buf = self.get_string_buffer()
|
|
self.writer = get_csv_dict_writer(self.output_buf, fieldnames=self.get_field_names(reader))
|
|
|
|
self.run(metadata, reader, chunk)
|
|
|
|
chunk += 1
|
|
|
|
self.post_processing()
|
|
self.logger.debug("Finished sending data...")
|
|
|
|
def get_field_names(self, reader):
|
|
return reader.fieldnames or []
|
|
|
|
@abc.abstractmethod
|
|
def run(self, metadata, reader, chunk):
|
|
"""
|
|
Must override by inherit class
|
|
- Read the passed data and write to splunkd either in chunks or yield based upon
|
|
type of search but you must write back to splunkd
|
|
|
|
@type metadata: dict
|
|
@param metadata: meta send by splunkd
|
|
|
|
@type body: string delimiter by \n
|
|
@param body: data send by splunkd
|
|
|
|
@type reader: csv reader
|
|
@param reader: data send by splunkd string delimiter by \n as a csvreader
|
|
|
|
@type chunk: int
|
|
@param chunk: count for which chunk are you processing
|
|
|
|
@return: None
|
|
"""
|
|
pass
|
|
|
|
|
|
class Singleton(object):
|
|
"""
|
|
A non-thread-safe helper class to ease implementing singletons.
|
|
This should be used as a decorator -- not a metaclass -- to the
|
|
class that should be a singleton.
|
|
The decorated class can define an `__init__` function
|
|
|
|
To get the singleton instance, use the `getInstance` method. Trying
|
|
to use `__call__` will result in a `TypeError` being raised.
|
|
|
|
Limitations: The decorated class cannot be inherited from itself.
|
|
|
|
Note: This class is repeated in SA-UserAccess.
|
|
"""
|
|
def __init__(self, decorated):
|
|
self._decorated = decorated
|
|
|
|
def __call__(self):
|
|
raise TypeError('Use `getInstance()` to access the Singleton')
|
|
|
|
def __instancecheck__(self, inst):
|
|
return isinstance(inst, self._decorated)
|
|
|
|
def getInstance(self, **kwargs):
|
|
"""
|
|
returns the singleton instance of the decorated object.
|
|
When called first, call the init method of the decorated object
|
|
Thereafter, return the object that was created first.
|
|
"""
|
|
try:
|
|
if hasattr(self._instance, 'update_singleton'):
|
|
self._instance.update_singleton(**kwargs)
|
|
return self._instance
|
|
except AttributeError:
|
|
self._instance = self._decorated(**kwargs)
|
|
return self._instance
|
|
|
|
|
|
@Singleton
|
|
class FileStringHandler(object):
|
|
"""
|
|
An utility class handles file and string encoding between py2 and py3 mode
|
|
This class ensures the following:
|
|
1. On Windows, the \n character at the end is mapped to \r\n instead.
|
|
This causes the length of the string to be different than the length of the string
|
|
reported in the transport header, which comes from calling len on the string while
|
|
it only contains \n.
|
|
The fix is to encode the string to ensure the a correct string len. The encoding
|
|
is needed to also handle the unicode case anyway.
|
|
2. Allocate the fp buffer when writing the encoded string.
|
|
"""
|
|
def __init__(self):
|
|
self.py3 = False
|
|
if sys.version_info >= (3, 0):
|
|
self.py3 = True
|
|
|
|
def get_writer(self, fh):
|
|
if fh is None:
|
|
fh = sys.stdout
|
|
return_fh = fh.buffer if self.py3 and hasattr(fh, 'buffer') else fh
|
|
return return_fh
|
|
|
|
def encode_string(self, s):
|
|
return_s = s.encode('utf-8') if self.py3 else s
|
|
return return_s
|