You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
345 lines
11 KiB
345 lines
11 KiB
#!/usr/bin/env python
|
|
# coding=utf-8
|
|
|
|
__author__ = "TrackMe Limited"
|
|
__copyright__ = "Copyright 2022-2026, TrackMe Limited, U.K."
|
|
__credits__ = "TrackMe Limited, U.K."
|
|
__license__ = "TrackMe Limited, all rights reserved"
|
|
__version__ = "0.1.0"
|
|
__maintainer__ = "TrackMe Limited, U.K."
|
|
__email__ = "support@trackme-solutions.com"
|
|
__status__ = "PRODUCTION"
|
|
|
|
# Standard library imports
|
|
import os
|
|
import sys
|
|
import re
|
|
import uuid
|
|
|
|
# splunk home
|
|
splunkhome = os.environ["SPLUNK_HOME"]
|
|
|
|
# append lib
|
|
sys.path.append(os.path.join(splunkhome, "etc", "apps", "trackme", "lib"))
|
|
|
|
|
|
def get_uuid():
|
|
"""
|
|
Function to return a unique uuid which is used to trace performance run_time of each subtask.
|
|
"""
|
|
return str(uuid.uuid4())
|
|
|
|
|
|
def remove_leading_spaces(text):
|
|
"""
|
|
Remove leading spaces from each line of a variable
|
|
"""
|
|
# split the text into lines, remove leading spaces from each line, and rejoin them
|
|
cleaned_text = "\n".join([line.lstrip() for line in text.split("\n")])
|
|
return cleaned_text
|
|
|
|
|
|
def decode_unicode(s, replace_with="?"):
|
|
"""
|
|
Decode strings with escaped bytes and clean non-printable characters, preserving UTF-8.
|
|
"""
|
|
|
|
def clean_text(text):
|
|
"""Remove or replace non-printable characters, preserving UTF-8."""
|
|
# This will preserve printable ASCII, extended ASCII (Latin-1 Supplement, etc.), and other Unicode characters
|
|
# It will replace control characters (0x00-0x1F and 0x7F-0x9F) except newline (0x0A), carriage return (0x0D), and tab (0x09)
|
|
return re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]", replace_with, text)
|
|
|
|
def replace_backslashes(text):
|
|
"""Replace backslashes with their Unicode representation, avoiding double encoding."""
|
|
return re.sub(r"(?<!\\)\\(?!u005c)", r"\\u005c", text)
|
|
|
|
if isinstance(s, bytes): # If it's bytes, decode as UTF-8
|
|
decoded = s.decode("utf-8", errors="replace")
|
|
else:
|
|
# If string contains escape sequences, attempt to decode
|
|
if "\\x" in s:
|
|
try:
|
|
decoded = (
|
|
bytes(s, "latin-1")
|
|
.decode("unicode_escape")
|
|
.encode("latin-1")
|
|
.decode("utf-8", errors="replace")
|
|
)
|
|
except Exception as e:
|
|
decoded = s # If any error occurs, use the original string
|
|
else:
|
|
decoded = s
|
|
|
|
# Replace backslashes with their Unicode representation
|
|
decoded = replace_backslashes(decoded)
|
|
|
|
# Clean non-printable characters from the decoded string
|
|
return clean_text(decoded)
|
|
|
|
|
|
def encode_unicode(s, replace_with="?"):
|
|
"""
|
|
Encode strings by interpreting Unicode escape sequences and restoring original non-UTF8 characters.
|
|
This is the reverse operation of decode_unicode.
|
|
"""
|
|
|
|
if not isinstance(s, str):
|
|
return s
|
|
|
|
# First, handle the specific \u005c\u00xx pattern that decode_unicode creates
|
|
# This needs to be done before the general unicode_escape decoding
|
|
if '\\u005c\\u00' in s:
|
|
# Replace \u005c\u00xx with the actual character
|
|
s = re.sub(r'\\u005c\\u00([0-9a-fA-F]{2})', lambda m: chr(int(m.group(1), 16)), s)
|
|
|
|
# Now try to use Python's built-in unicode_escape decoder for remaining sequences
|
|
try:
|
|
# This will handle all remaining Unicode escape sequences including \u00e8 -> è
|
|
decoded = s.encode('latin-1').decode('unicode_escape')
|
|
|
|
# Check if there are still any Unicode sequences that need processing
|
|
if '\\u' in decoded:
|
|
try:
|
|
# Try to decode any remaining Unicode sequences
|
|
final_decoded = decoded.encode('latin-1').decode('unicode_escape')
|
|
return final_decoded
|
|
except (UnicodeDecodeError, UnicodeEncodeError):
|
|
# If that fails, use regex to handle remaining sequences
|
|
final_decoded = re.sub(r'\\u([0-9a-fA-F]{4})', lambda m: chr(int(m.group(1), 16)), decoded)
|
|
return final_decoded
|
|
|
|
return decoded
|
|
|
|
except (UnicodeDecodeError, UnicodeEncodeError):
|
|
# If that fails, use our custom approach for any remaining sequences
|
|
def restore_unicode_escapes(text):
|
|
"""Restore Unicode escape sequences to their original characters."""
|
|
# Handle other Unicode escape sequences
|
|
text = re.sub(r'\\u([0-9a-fA-F]{4})', lambda m: chr(int(m.group(1), 16)), text)
|
|
|
|
# Handle hex escape sequences
|
|
text = re.sub(r'\\x([0-9a-fA-F]{2})', lambda m: chr(int(m.group(1), 16)), text)
|
|
|
|
return text
|
|
|
|
decoded = restore_unicode_escapes(s)
|
|
return decoded
|
|
|
|
|
|
def interpret_boolean(value):
|
|
"""
|
|
Function to interpret the boolean value:
|
|
if the value is 1 or true (case insensitive), return True, otherwise return False
|
|
|
|
"""
|
|
if isinstance(value, bool):
|
|
return value
|
|
elif isinstance(value, str):
|
|
if value.lower() == "true" or value == "1":
|
|
return True
|
|
else:
|
|
return False
|
|
elif isinstance(value, int):
|
|
if value == 1:
|
|
return True
|
|
else:
|
|
return False
|
|
else:
|
|
return False
|
|
|
|
|
|
def strict_interpret_boolean(value):
|
|
"""
|
|
Standardize a value to a proper boolean.
|
|
Accepts:
|
|
- String 'true'/'True' or 'false'/'False'
|
|
- String '0' or '1'
|
|
- Integer 0 or 1
|
|
- Boolean True or False
|
|
Returns:
|
|
- Boolean True or False
|
|
Raises:
|
|
- ValueError if the input cannot be converted to a boolean
|
|
"""
|
|
if isinstance(value, bool):
|
|
return value
|
|
if isinstance(value, str):
|
|
value = value.lower()
|
|
if value in ("true", "1"):
|
|
return True
|
|
if value in ("false", "0"):
|
|
return False
|
|
if isinstance(value, int):
|
|
return bool(value)
|
|
raise ValueError("Value must be one of: true/True/1 or false/False/0")
|
|
|
|
|
|
def update_wildcard(object_value):
|
|
"""
|
|
Update wildcard in the object value and replace it with '.*' so we interpret it as regex
|
|
"""
|
|
# This regex will find '*' that are not preceded by a dot
|
|
pattern = r"(?<!\.)\*"
|
|
# Replace those '*' with '.*'
|
|
return re.sub(pattern, r".*", object_value)
|
|
|
|
|
|
def escape_backslash(object_value):
|
|
"""
|
|
Escape backslashes in the object_value
|
|
"""
|
|
# This regex will find '\' and replace it with '\\'
|
|
pattern = r"\\"
|
|
# Replace those '\' with '\\'
|
|
return re.sub(pattern, r"\\\\", object_value)
|
|
|
|
|
|
def replace_encoded_backslashes(object_value):
|
|
"""
|
|
Replace encoded backslashes with actual backslashes
|
|
"""
|
|
# This regex will find '\\u005c' and replace it with '\'
|
|
pattern = r"\\u005c"
|
|
# Replace those '\\u005c' with '\'
|
|
return re.sub(pattern, r"\\", object_value)
|
|
|
|
|
|
def replace_encoded_doublebackslashes(object_value):
|
|
"""
|
|
Replace encoded backslashes with double backslashes
|
|
"""
|
|
# This regex will find '\\u005c' and replace it with '\'
|
|
pattern = r"\\u005c"
|
|
# Replace those '\\u005c' with '\'
|
|
return re.sub(pattern, r"\\\\", object_value)
|
|
|
|
|
|
def replace_encoded_fourbackslashes(object_value):
|
|
"""
|
|
Replace encoded backslashes with four backslashes
|
|
"""
|
|
# This regex will find '\\u005c' and replace it with '\'
|
|
pattern = r"\\u005c"
|
|
# Replace those '\\u005c' with '\'
|
|
return re.sub(pattern, r"\\\\\\\\", object_value)
|
|
|
|
|
|
def check_tenant_id(value):
|
|
"""
|
|
Convert a time value with unit to seconds.
|
|
Supports formats:
|
|
- Integer (assumed to be seconds)
|
|
- String with unit suffix (e.g. "1h", "1d", "1w")
|
|
Returns the value in seconds as an integer.
|
|
"""
|
|
|
|
# trim the tenant_name
|
|
value = value.strip()
|
|
# make it lowercase
|
|
value = value.lower().replace(" ", "-")
|
|
# avoid ending with multiple underscores in the tenant id
|
|
value = re.sub(r"_{1,}", "_", value)
|
|
# replace any underscore with a hyphen
|
|
value = re.sub(r"_", "-", value)
|
|
# replace anything that is not a letter, number or hyphen with a hyphen
|
|
value = re.sub(r"[^a-zA-Z0-9-]", "-", value)
|
|
|
|
return value
|
|
|
|
|
|
def convert_time_to_seconds(time_value):
|
|
"""
|
|
Convert a time value with unit to seconds.
|
|
Supports formats:
|
|
- Integer (assumed to be seconds)
|
|
- String with unit suffix (e.g. "15m", "1h", "1d", "1w")
|
|
Returns the value in seconds as an integer.
|
|
"""
|
|
try:
|
|
# If it's already an integer, return it
|
|
if isinstance(time_value, int):
|
|
return time_value
|
|
|
|
# If it's a string, try to parse the unit
|
|
if isinstance(time_value, str):
|
|
# Remove any whitespace
|
|
time_value = time_value.strip()
|
|
|
|
# Check if it ends with a unit
|
|
if time_value.endswith("m"):
|
|
return int(float(time_value[:-1]) * 60) # minutes to seconds
|
|
elif time_value.endswith("h"):
|
|
return int(float(time_value[:-1]) * 3600) # hours to seconds
|
|
elif time_value.endswith("d"):
|
|
return int(float(time_value[:-1]) * 86400) # days to seconds
|
|
elif time_value.endswith("w"):
|
|
return int(float(time_value[:-1]) * 604800) # weeks to seconds
|
|
else:
|
|
# Try to convert to integer (assumed to be seconds)
|
|
return int(float(time_value))
|
|
|
|
# If we get here, try to convert to float then int
|
|
return int(float(time_value))
|
|
|
|
except (ValueError, TypeError):
|
|
raise ValueError(
|
|
f"Invalid time value format: {time_value}. Expected format: integer or string with unit suffix (m/h/d/w)"
|
|
)
|
|
|
|
|
|
def normalize_anomaly_reason(anomaly_reason):
|
|
"""
|
|
Normalizes the anomaly_reason field into a consistent list of strings.
|
|
|
|
This function handles various input formats for anomaly_reason, including:
|
|
- A single string with delimiters (pipe, newline, or comma)
|
|
- A list of strings, where each string might also contain delimiters
|
|
- None, "N/A", or other null-like values
|
|
|
|
It processes the input and returns a sorted list of unique, clean reason strings.
|
|
|
|
Args:
|
|
anomaly_reason (str, list, None): The input anomaly_reason to normalize.
|
|
|
|
Returns:
|
|
list: A sorted list of unique, non-empty reason strings. Returns an
|
|
empty list if no valid reasons are found.
|
|
"""
|
|
if not anomaly_reason:
|
|
return []
|
|
|
|
raw_reasons = []
|
|
|
|
# If the input is a list, recursively process each item
|
|
if isinstance(anomaly_reason, list):
|
|
for item in anomaly_reason:
|
|
raw_reasons.extend(normalize_anomaly_reason(item))
|
|
|
|
# If the input is a string, split it by common delimiters
|
|
elif isinstance(anomaly_reason, str):
|
|
# Ignore common null-like values
|
|
if anomaly_reason.strip().lower() in ("n/a", "none", "null", ""):
|
|
return []
|
|
# Split by pipe, newline, or comma
|
|
raw_reasons = re.split(r"[|\n,]", anomaly_reason)
|
|
|
|
# For any other type, we cannot process it
|
|
else:
|
|
return []
|
|
|
|
# Clean up the list:
|
|
# - Strip whitespace from each reason
|
|
# - Filter out any resulting empty or null-like strings
|
|
# - Use a set to get unique reasons, then convert back to a list and sort it
|
|
|
|
unique_reasons = {
|
|
reason.strip()
|
|
for reason in raw_reasons
|
|
if reason
|
|
and reason.strip()
|
|
and reason.strip().lower() not in ("n/a", "none", "null")
|
|
}
|
|
|
|
return sorted(list(unique_reasons))
|