You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
594 lines
24 KiB
594 lines
24 KiB
#
|
|
# Copyright 2024 Splunk Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import collections
|
|
import csv
|
|
import json
|
|
import logging
|
|
import logging.handlers
|
|
import os
|
|
import random
|
|
import re
|
|
import time
|
|
|
|
import splunk.rest as rest
|
|
from splunk.clilib.bundle_paths import make_splunkhome_path
|
|
from splunk.util import mktimegm, normalizeBoolean
|
|
|
|
# set the maximum allowable CSV field size
|
|
#
|
|
# The default of the csv module is 128KB; upping to 10MB. See SPL-12117 for
|
|
# the background on issues surrounding field sizes.
|
|
# (this method is new in python 2.5)
|
|
csv.field_size_limit(10485760)
|
|
|
|
|
|
class InvalidResultID(Exception):
|
|
pass
|
|
|
|
|
|
class ModularAction:
|
|
DEFAULT_MSGFIELDS = [
|
|
"signature",
|
|
"action_name",
|
|
"search_name",
|
|
"sid",
|
|
"orig_sid",
|
|
"rid",
|
|
"orig_rid",
|
|
"app",
|
|
"user",
|
|
"action_mode",
|
|
"action_status",
|
|
]
|
|
DEFAULT_MESSAGE = "sendmodaction - " + " ".join(
|
|
['{i}="{{d[{i}]}}"'.format(i=i) for i in DEFAULT_MSGFIELDS]
|
|
)
|
|
# The above yields a string.format() compatible format string:
|
|
#
|
|
# 'sendmodaction - signature="{d[signature]}" action_name="{d[action_name]}"
|
|
# search_name="{d[search_name]}" sid="{d[sid]}" orig_sid="{d[orig_sid]}"
|
|
# rid="{d[rid]}" orig_rid="{d[orig_rid]}" app="{d[app]}" user="{d[user]}"
|
|
# action_mode="{d[action_mode]}" action_status="{d[action_status]}"'
|
|
|
|
DEFAULT_DROPEXP = lambda x: (
|
|
(x.startswith("_") and x not in ["_raw", "_time"])
|
|
or x.startswith("date_")
|
|
or x in ["punct", "sid", "rid", "orig_sid", "orig_rid"]
|
|
)
|
|
|
|
DEFAULT_MAPEXP = lambda x: (
|
|
x.startswith("tag::")
|
|
or x
|
|
in [
|
|
"_time",
|
|
"_raw",
|
|
"splunk_server",
|
|
"index",
|
|
"source",
|
|
"sourcetype",
|
|
"host",
|
|
"linecount",
|
|
"timestartpos",
|
|
"timeendpos",
|
|
"eventtype",
|
|
"tag",
|
|
"search_name",
|
|
"event_hash",
|
|
"event_id",
|
|
]
|
|
)
|
|
|
|
DEFAULT_HEADER = '***SPLUNK*** index="%s" host="%s" source="%s"'
|
|
DEFAULT_BREAKER = "==##~~##~~ 1E8N3D4E6V5E7N2T9 ~~##~~##==\n"
|
|
DEFAULT_IDLINE = '***Common Action Model*** orig_action_name="%s" orig_sid="%s" orig_rid="%s" sourcetype="%s"\n'
|
|
DEFAULT_INDEX = "summary"
|
|
DEFAULT_CHUNK = 50000
|
|
|
|
SHORT_FORMAT = "%(asctime)s %(levelname)s %(message)s"
|
|
|
|
def __init__(self, settings, logger, action_name="unknown"):
|
|
"""Initialize ModularAction class.
|
|
|
|
@param settings: A modular action payload in JSON format.
|
|
@param logger: A logging instance.
|
|
Recommend using ModularAction.setup_logger.
|
|
@param action_name: The action name.
|
|
action_name in payload will take precedence.
|
|
"""
|
|
self.settings = json.loads(settings)
|
|
self.logger = logger
|
|
self.session_key = self.settings.get("session_key")
|
|
self.sid = self.settings.get("sid")
|
|
self.sid_snapshot = ""
|
|
## if sid contains rt_scheduler with snapshot-sid; drop snapshot-sid
|
|
## sometimes self.sid may be an integer (1465593470.1228)
|
|
try:
|
|
rtsid = re.match(r"^(rt_scheduler.*)\.(\d+)$", self.sid)
|
|
if rtsid:
|
|
self.sid = rtsid.group(1)
|
|
self.sid_snapshot = rtsid.group(2)
|
|
except:
|
|
pass
|
|
|
|
## rid_ntuple is a named tuple that represents
|
|
## the three variables that change on a per-result basis
|
|
self.rid_ntuple = collections.namedtuple("ID", ["orig_sid", "rid", "orig_rid"])
|
|
## rids is a list of rid_ntuple values
|
|
## automatically maintained by update() calls
|
|
self.rids = []
|
|
## current orig_sid based on update()
|
|
## aka self.rids[-1].orig_sid
|
|
self.orig_sid = ""
|
|
## current rid based on update()
|
|
## aka self.rids[-1].rid
|
|
self.rid = ""
|
|
## current orig_rid based on update()
|
|
## aka self.rids[-1].orig_rid
|
|
self.orig_rid = ""
|
|
|
|
self.results_file = self.settings.get("results_file")
|
|
## info
|
|
self.info = {}
|
|
if self.results_file:
|
|
self.info_file = os.path.join(
|
|
os.path.dirname(self.results_file), "info.csv"
|
|
)
|
|
self.search_name = self.settings.get("search_name")
|
|
self.app = self.settings.get("app")
|
|
self.user = self.settings.get("user") or self.settings.get("owner")
|
|
self.configuration = self.settings.get("configuration", {})
|
|
## enforce configuration is a 'dict'
|
|
if not isinstance(self.configuration, dict):
|
|
self.configuration = {}
|
|
## set loglevel to DEBUG if verbose
|
|
if normalizeBoolean(self.configuration.get("verbose", "false")):
|
|
self.logger.setLevel(logging.DEBUG)
|
|
self.logger.debug("loglevel set to DEBUG")
|
|
## use | sendalert param.action_name=$action_name$
|
|
self.action_name = self.configuration.get("action_name") or action_name
|
|
## use sid to determine action_mode
|
|
if isinstance(self.sid, str) and "scheduler" in self.sid:
|
|
self.action_mode = "saved"
|
|
else:
|
|
self.action_mode = "adhoc"
|
|
|
|
self.action_status = ""
|
|
## Since we don't use the result object we get from settings it will be purged
|
|
try:
|
|
del self.settings["result"]
|
|
except Exception:
|
|
pass
|
|
## events
|
|
self.events = []
|
|
|
|
def addinfo(self):
|
|
"""The purpose of this method is to populate the
|
|
modular action info variable with the contents of info.csv.
|
|
|
|
@raise Exception: raises Exception if self.info_file could not be opened
|
|
or if there were problems parsing the info.csv data
|
|
"""
|
|
if self.info_file:
|
|
try:
|
|
with open(self.info_file) as fh:
|
|
self.info = next(csv.DictReader(fh))
|
|
except Exception as e:
|
|
self.message("Could not retrieve info.csv", level=logging.WARN)
|
|
|
|
def addjobinfo(self):
|
|
"""The purpose of this method is to populate the job variable
|
|
with the contents from REST (/services/search/jobs/<sid>)
|
|
|
|
SPL-112815 - sendalert - not all $job.<param>$ parameters come through
|
|
|
|
@raise Exception: raises Exception if search job information could not
|
|
be retrieved via REST (search/jobs) based on self.sid
|
|
"""
|
|
self.job = {}
|
|
if self.sid:
|
|
try:
|
|
response, content = rest.simpleRequest(
|
|
"search/jobs/%s" % self.sid,
|
|
sessionKey=self.session_key,
|
|
getargs={"output_mode": "json"},
|
|
)
|
|
if response.status == 200:
|
|
self.job = json.loads(content)["entry"][0]["content"]
|
|
self.message("Successfully retrieved search job info")
|
|
self.logger.debug(self.job)
|
|
else:
|
|
self.message(
|
|
"Could not retrieve search job info", level=logging.WARN
|
|
)
|
|
except Exception as e:
|
|
self.message("Could not retrieve search job info", level=logging.WARN)
|
|
|
|
def message(self, signature, status=None, rids=None, level=logging.INFO, **kwargs):
|
|
"""The purpose of this method is to provide a common messaging interface.
|
|
|
|
@param signature: A string representing the message we want to log.
|
|
@param status: An optional status that we want to log.
|
|
Defaults to None.
|
|
@param rids: An optional list of rid_ntuple values in case we
|
|
want to generate the message for multiple rids.
|
|
Defaults to None (use the rid currently loaded).
|
|
@param level: The logging level to use when writing the message.
|
|
Defaults to logging.INFO (INFO)
|
|
@param kwargs: Additional keyword arguments to be included with the
|
|
message.
|
|
Defaults to "no arguments".
|
|
|
|
@return message: This method logs the message; however, for
|
|
backwards compatibility we also return the message.
|
|
"""
|
|
## status
|
|
status = status or self.action_status or ""
|
|
## rid
|
|
if not isinstance(rids, list):
|
|
rids = [self.rid_ntuple(self.orig_sid, self.rid, self.orig_rid)]
|
|
## kwargs - prune any duplicate keys based on DEFAULT_MSGFIELDS
|
|
## prune any keys with special characters [A-Za-z_]+
|
|
newargs = [
|
|
x
|
|
for x in kwargs
|
|
if (x not in ModularAction.DEFAULT_MSGFIELDS) and re.match("[A-Za-z_]+", x)
|
|
]
|
|
## MSG
|
|
msg = "{} {}".format(
|
|
ModularAction.DEFAULT_MESSAGE,
|
|
" ".join(['{i}="{{d[{i}]}}"'.format(i=i) for i in newargs]),
|
|
)
|
|
|
|
# This will set the default value of any value NOT in the dictionary to the
|
|
# empty string.
|
|
argsdict = collections.defaultdict(str)
|
|
# order is important here - here we update first from kwargs, then from our
|
|
# expected arg set.
|
|
argsdict.update(kwargs)
|
|
argsdict.update(
|
|
{
|
|
"signature": signature or "",
|
|
"action_name": self.action_name or "",
|
|
"search_name": self.search_name or "",
|
|
"sid": self.sid or "",
|
|
"app": self.app or "",
|
|
"user": self.user or "",
|
|
"action_mode": self.action_mode or "",
|
|
"action_status": status,
|
|
}
|
|
)
|
|
|
|
for rid_ntuple in rids:
|
|
if len(rid_ntuple) == 3:
|
|
## Update the arguments dictionary
|
|
argsdict.update(
|
|
{
|
|
"orig_sid": rid_ntuple.orig_sid or "",
|
|
"rid": rid_ntuple.rid or "",
|
|
"orig_rid": rid_ntuple.orig_rid or "",
|
|
}
|
|
)
|
|
## This is where the magic happens. The format string will use the
|
|
## attributes of "argsdict"
|
|
message = msg.format(d=argsdict)
|
|
## prune empty string key-value pairs
|
|
for match in re.finditer(r'[A-Za-z_]+=""(\s|$)', message):
|
|
message = message.replace(match.group(0), "", 1)
|
|
message = message.strip()
|
|
self.logger.log(level, message)
|
|
else:
|
|
self.logger.warn("Could not unpack rid_ntuple")
|
|
message = ""
|
|
|
|
return message
|
|
|
|
def update(self, result):
|
|
"""The purpose of this method is to update the ModularAction instance
|
|
identifiers based on the current result being operated on.
|
|
|
|
This is the most important method in the library as it sets up
|
|
rid, orig_sid, and orig_rid to be used by subsequent class methods.
|
|
|
|
Not calling update() immediately for each result before doing additional
|
|
work can have adverse affects.
|
|
|
|
@param signature: A string representing the message we want to log.
|
|
@param status: An optional status that we want to log.
|
|
Defaults to None.
|
|
@param rids: An optional list of rid_ntuple values in case we
|
|
want to generate the message for multiple rids.
|
|
Defaults to None (use the rid currently loaded).
|
|
@param level: The logging level to use when writing the message.
|
|
Defaults to logging.INFO (INFO)
|
|
@param kwargs: Additional keyword arguments to be included with the
|
|
message.
|
|
Defaults to "no arguments".
|
|
|
|
@return message: This method logs the message; however, for
|
|
backwards compatiblity we also return the message.
|
|
"""
|
|
## This is for events/results that were created as the result of a previous action
|
|
self.orig_sid = result.get("orig_sid", "")
|
|
## This is for events/results that were created as the result of a previous action
|
|
self.orig_rid = result.get("orig_rid", "")
|
|
if "rid" in result and isinstance(result["rid"], (str, int)):
|
|
self.rid = str(result["rid"])
|
|
if self.sid_snapshot:
|
|
self.rid = f"{self.rid}.{self.sid_snapshot}"
|
|
## add result info to list of named tuples
|
|
self.rids.append(self.rid_ntuple(self.orig_sid, self.rid, self.orig_rid))
|
|
else:
|
|
raise InvalidResultID("Result must have an ID")
|
|
|
|
def invoke(self):
|
|
"""The purpose of this method is to generate per-result invocation messages.
|
|
This method is used to identify that an action is being attempted on a per-result basis.
|
|
|
|
Remember to call update() prior to invoke() to ensure that the invocation message
|
|
reflects the appropriate identifiers.
|
|
"""
|
|
self.message("Invoking modular action")
|
|
|
|
def result2stash(
|
|
self, result, dropexp=DEFAULT_DROPEXP, mapexp=DEFAULT_MAPEXP, addinfo=False
|
|
):
|
|
"""The purpose of this method is to formulate an event in stash format
|
|
|
|
@param result: The result dictionary to generate a stash event for.
|
|
@param dropexp: A lambda expression used to determine whether a field
|
|
should be dropped or not.
|
|
Defaults to DEFAULT_DROPEXP.
|
|
@param mapexp: A lambda expression used to determine whether a field
|
|
should be mapped (prepended with "orig_") or not.
|
|
Defaults to DEFAULT_MAPEXP.
|
|
@param addinfo: Whether or not to add search information to the event.
|
|
"info" includes search_now, info_min_time, info_max_time,
|
|
and info_search_time fields.
|
|
Requires that information was loaded into the ModularAction
|
|
instance via addinfo()
|
|
|
|
@return _raw: Returns a string which represents the result in stash format.
|
|
|
|
The following example has been broken onto multiple lines for readability:
|
|
06/21/2016 10:00:00 -0700,
|
|
search_name="Access - Brute Force Access Behavior Detected - Rule",
|
|
search_now=0.000, info_min_time=1466528400.000, info_max_time=1466532600.000, info_search_time=1465296264.179,
|
|
key1=key1val, key2=key2val, key3=key3val, key4=key4val1, key4=key4val2, ...
|
|
"""
|
|
dropexp = dropexp or (lambda x: False)
|
|
mapexp = mapexp or (lambda x: False)
|
|
orig_dropexp = (
|
|
lambda x: x.startswith("orig_") and x[5:] in result and mapexp(x[5:])
|
|
)
|
|
|
|
## addinfo
|
|
if addinfo:
|
|
result["info_min_time"] = self.info.get("_search_et", "0.000")
|
|
info_max_time = self.info.get("_search_lt")
|
|
if not info_max_time or info_max_time == 0 or info_max_time == "0":
|
|
info_max_time = "+Infinity"
|
|
result["info_max_time"] = info_max_time
|
|
result["info_search_time"] = self.info.get("_timestamp", "")
|
|
|
|
## construct _raw
|
|
_raw = "%s" % result.get("_time", mktimegm(time.gmtime()))
|
|
if self.search_name:
|
|
_raw += ', search_name="%s"' % self.search_name
|
|
|
|
processed_keys = []
|
|
for key, val in sorted(result.items()):
|
|
vals = []
|
|
## if we have a proper mv field
|
|
if (
|
|
key.startswith("__mv_")
|
|
and val
|
|
and isinstance(val, str)
|
|
and val.startswith("$")
|
|
and val.endswith("$")
|
|
):
|
|
real_key = key[5:]
|
|
vals = val[1:-1].split("$;$")
|
|
## if proper sv field
|
|
elif val and not key.startswith("__mv_"):
|
|
real_key = key
|
|
vals = [val]
|
|
|
|
## if we have vals and key hasn't been processed
|
|
## and key is not to be dropped...
|
|
if (
|
|
vals
|
|
and (real_key not in processed_keys)
|
|
and not dropexp(real_key)
|
|
and not orig_dropexp(real_key)
|
|
):
|
|
## iterate vals
|
|
for val in vals:
|
|
## format literal '$'
|
|
if key.startswith("__mv"):
|
|
val = val.replace("$$", "$")
|
|
## escape quotes
|
|
if isinstance(val, str):
|
|
val = val.replace('"', r"\"")
|
|
## check map
|
|
if mapexp(real_key):
|
|
_raw += ', {}="{}"'.format("orig_" + real_key.lstrip("_"), val)
|
|
else:
|
|
_raw += f', {real_key}="{val}"'
|
|
processed_keys.append(real_key)
|
|
|
|
return _raw
|
|
|
|
def addevent(self, raw, sourcetype, cam_header=True):
|
|
"""The purpose of this method is to add a properly constructed event
|
|
to the events list in the ModularAction instance. This ensures events
|
|
are created with the appropriate index-time header.
|
|
|
|
The index-time header is responsible for setting sourcetype,
|
|
orig_action_name, orig_sid, and orig_rid. The index-time header will
|
|
not be present in the _raw of generated events.
|
|
|
|
Remember to call update() prior to addevent() to ensure that the events
|
|
reflect the appropriate orig_sid and orig_rid identifiers.
|
|
|
|
@param raw: The text of the event you want to generate.
|
|
@param sourcetype: The sourcetype of the event you want to generate.
|
|
@param cam_header: Optionally exclude the inclusion of the index-time header.
|
|
Defaults to True (include header).
|
|
"""
|
|
if cam_header:
|
|
if self.orig_sid:
|
|
action_idline = ModularAction.DEFAULT_IDLINE % (
|
|
self.action_name,
|
|
self.orig_sid,
|
|
self.orig_rid,
|
|
sourcetype,
|
|
)
|
|
else:
|
|
action_idline = ModularAction.DEFAULT_IDLINE % (
|
|
self.action_name,
|
|
self.sid,
|
|
self.rid,
|
|
sourcetype,
|
|
)
|
|
self.events.append(action_idline + raw)
|
|
else:
|
|
self.events.append(raw)
|
|
|
|
def writeevents(
|
|
self, index="summary", host=None, source=None, fext="common_action_model"
|
|
):
|
|
"""The purpose of this method is to create arbitrary splunk events
|
|
from the list of events in the ModularAction instance.
|
|
|
|
Please use addevent() for populating the list of events in
|
|
the ModularAction instance.
|
|
|
|
@param index: The index to write the events to.
|
|
Defaults to "summary".
|
|
@param host: The value of host the events should take on.
|
|
Defaults to None (auto).
|
|
@param source: The value of source the events should take on.
|
|
Defaults to None (auto).
|
|
@param fext: The extension of the file to write out.
|
|
Files are written to $SPLUNK_HOME/var/spool/splunk.
|
|
File extensions can only contain word characters,
|
|
dash, and have a 200 char max.
|
|
"stash_" is automatically prepended to all extensions.
|
|
Defaults to "common_action_model" ("stash_common_action_model").
|
|
Only override if you've set up a corresponding props.conf
|
|
stanza to handle the extension.
|
|
|
|
@return bool: Returns True if all events were successfully written
|
|
Returns False if any errors were encountered
|
|
"""
|
|
## internal makeevents method for normalizing strings
|
|
## that will be used in the various headers we write out
|
|
def get_string(input, default):
|
|
try:
|
|
return input.replace('"', "_")
|
|
except AttributeError:
|
|
return default
|
|
|
|
if self.events:
|
|
## sanitize file extension
|
|
if not fext or not re.match(r"^[\w-]+$", fext):
|
|
self.logger.warn(
|
|
"Requested file extension was ignored due to invalid characters"
|
|
)
|
|
fext = "common_action_model"
|
|
elif len(fext) > 200:
|
|
self.logger.warn("Requested file extension was ignored due to length")
|
|
fext = "common_action_model"
|
|
## header
|
|
header_line = ModularAction.DEFAULT_HEADER % (
|
|
get_string(index, ModularAction.DEFAULT_INDEX),
|
|
get_string(host, ""),
|
|
get_string(source, ""),
|
|
)
|
|
## process event chunks
|
|
for chunk in (
|
|
self.events[x : x + ModularAction.DEFAULT_CHUNK]
|
|
for x in range(0, len(self.events), ModularAction.DEFAULT_CHUNK)
|
|
):
|
|
## initialize output string
|
|
default_breaker = "\n" + ModularAction.DEFAULT_BREAKER
|
|
fout = header_line + default_breaker + (default_breaker).join(chunk)
|
|
## write output string
|
|
try:
|
|
fn = "{}_{}.stash_{}".format(
|
|
mktimegm(time.gmtime()),
|
|
random.randint(0, 100000),
|
|
fext,
|
|
)
|
|
fp = make_splunkhome_path(["var", "spool", "splunk", fn])
|
|
## obtain fh
|
|
with open(fp, "w") as fh:
|
|
fh.write(fout)
|
|
except:
|
|
signature = "Error obtaining file handle during makeevents"
|
|
self.message(signature, level=logging.ERROR, file_path=fp)
|
|
self.logger.exception(signature + " file_path=%s" % fp)
|
|
return False
|
|
self.message(
|
|
"Successfully created splunk events", event_count=len(self.events)
|
|
)
|
|
return True
|
|
return False
|
|
|
|
def dowork(self):
|
|
"""This method serves as an illustration stub.
|
|
Serves as a container for operations which satisfy the nature of the action.
|
|
For instance, the third party API call.
|
|
|
|
For cleanliness it is recommended that you subclass ModularAction
|
|
and implement your own dowork() method.
|
|
"""
|
|
return
|
|
|
|
@staticmethod
|
|
def setup_logger(
|
|
name, level=logging.INFO, maxBytes=25000000, backupCount=5, format=SHORT_FORMAT
|
|
):
|
|
"""Set up a logging instance.
|
|
|
|
@param name: The log file name.
|
|
We recommend "$action_name$_modalert".
|
|
@param level: The logging level.
|
|
@param maxBytes: The maximum log file size before rollover.
|
|
@param backupCount: The number of log files to retain.
|
|
|
|
@return logger: Returns an instance of logger
|
|
"""
|
|
logfile = make_splunkhome_path(["var", "log", "splunk", name + ".log"])
|
|
logger = logging.getLogger(name)
|
|
logger.setLevel(level)
|
|
logger.propagate = False # Prevent the log messages from being duplicated in the python.log file
|
|
|
|
# Prevent re-adding handlers to the logger object, which can cause duplicate log lines.
|
|
handler_exists = any(
|
|
[True for h in logger.handlers if h.baseFilename == logfile]
|
|
)
|
|
if not handler_exists:
|
|
file_handler = logging.handlers.RotatingFileHandler(
|
|
logfile, maxBytes=maxBytes, backupCount=backupCount
|
|
)
|
|
formatter = logging.Formatter(format)
|
|
file_handler.setFormatter(formatter)
|
|
logger.addHandler(file_handler)
|
|
|
|
return logger
|