You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Splunk_Deploiement/apps/trackme/lib/trackme_libs_utils.py

345 lines
11 KiB

#!/usr/bin/env python
# coding=utf-8
__author__ = "TrackMe Limited"
__copyright__ = "Copyright 2022-2026, TrackMe Limited, U.K."
__credits__ = "TrackMe Limited, U.K."
__license__ = "TrackMe Limited, all rights reserved"
__version__ = "0.1.0"
__maintainer__ = "TrackMe Limited, U.K."
__email__ = "support@trackme-solutions.com"
__status__ = "PRODUCTION"
# Standard library imports
import os
import sys
import re
import uuid
# splunk home
splunkhome = os.environ["SPLUNK_HOME"]
# append lib
sys.path.append(os.path.join(splunkhome, "etc", "apps", "trackme", "lib"))
def get_uuid():
"""
Function to return a unique uuid which is used to trace performance run_time of each subtask.
"""
return str(uuid.uuid4())
def remove_leading_spaces(text):
"""
Remove leading spaces from each line of a variable
"""
# split the text into lines, remove leading spaces from each line, and rejoin them
cleaned_text = "\n".join([line.lstrip() for line in text.split("\n")])
return cleaned_text
def decode_unicode(s, replace_with="?"):
"""
Decode strings with escaped bytes and clean non-printable characters, preserving UTF-8.
"""
def clean_text(text):
"""Remove or replace non-printable characters, preserving UTF-8."""
# This will preserve printable ASCII, extended ASCII (Latin-1 Supplement, etc.), and other Unicode characters
# It will replace control characters (0x00-0x1F and 0x7F-0x9F) except newline (0x0A), carriage return (0x0D), and tab (0x09)
return re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]", replace_with, text)
def replace_backslashes(text):
"""Replace backslashes with their Unicode representation, avoiding double encoding."""
return re.sub(r"(?<!\\)\\(?!u005c)", r"\\u005c", text)
if isinstance(s, bytes): # If it's bytes, decode as UTF-8
decoded = s.decode("utf-8", errors="replace")
else:
# If string contains escape sequences, attempt to decode
if "\\x" in s:
try:
decoded = (
bytes(s, "latin-1")
.decode("unicode_escape")
.encode("latin-1")
.decode("utf-8", errors="replace")
)
except Exception as e:
decoded = s # If any error occurs, use the original string
else:
decoded = s
# Replace backslashes with their Unicode representation
decoded = replace_backslashes(decoded)
# Clean non-printable characters from the decoded string
return clean_text(decoded)
def encode_unicode(s, replace_with="?"):
"""
Encode strings by interpreting Unicode escape sequences and restoring original non-UTF8 characters.
This is the reverse operation of decode_unicode.
"""
if not isinstance(s, str):
return s
# First, handle the specific \u005c\u00xx pattern that decode_unicode creates
# This needs to be done before the general unicode_escape decoding
if '\\u005c\\u00' in s:
# Replace \u005c\u00xx with the actual character
s = re.sub(r'\\u005c\\u00([0-9a-fA-F]{2})', lambda m: chr(int(m.group(1), 16)), s)
# Now try to use Python's built-in unicode_escape decoder for remaining sequences
try:
# This will handle all remaining Unicode escape sequences including \u00e8 -> è
decoded = s.encode('latin-1').decode('unicode_escape')
# Check if there are still any Unicode sequences that need processing
if '\\u' in decoded:
try:
# Try to decode any remaining Unicode sequences
final_decoded = decoded.encode('latin-1').decode('unicode_escape')
return final_decoded
except (UnicodeDecodeError, UnicodeEncodeError):
# If that fails, use regex to handle remaining sequences
final_decoded = re.sub(r'\\u([0-9a-fA-F]{4})', lambda m: chr(int(m.group(1), 16)), decoded)
return final_decoded
return decoded
except (UnicodeDecodeError, UnicodeEncodeError):
# If that fails, use our custom approach for any remaining sequences
def restore_unicode_escapes(text):
"""Restore Unicode escape sequences to their original characters."""
# Handle other Unicode escape sequences
text = re.sub(r'\\u([0-9a-fA-F]{4})', lambda m: chr(int(m.group(1), 16)), text)
# Handle hex escape sequences
text = re.sub(r'\\x([0-9a-fA-F]{2})', lambda m: chr(int(m.group(1), 16)), text)
return text
decoded = restore_unicode_escapes(s)
return decoded
def interpret_boolean(value):
"""
Function to interpret the boolean value:
if the value is 1 or true (case insensitive), return True, otherwise return False
"""
if isinstance(value, bool):
return value
elif isinstance(value, str):
if value.lower() == "true" or value == "1":
return True
else:
return False
elif isinstance(value, int):
if value == 1:
return True
else:
return False
else:
return False
def strict_interpret_boolean(value):
"""
Standardize a value to a proper boolean.
Accepts:
- String 'true'/'True' or 'false'/'False'
- String '0' or '1'
- Integer 0 or 1
- Boolean True or False
Returns:
- Boolean True or False
Raises:
- ValueError if the input cannot be converted to a boolean
"""
if isinstance(value, bool):
return value
if isinstance(value, str):
value = value.lower()
if value in ("true", "1"):
return True
if value in ("false", "0"):
return False
if isinstance(value, int):
return bool(value)
raise ValueError("Value must be one of: true/True/1 or false/False/0")
def update_wildcard(object_value):
"""
Update wildcard in the object value and replace it with '.*' so we interpret it as regex
"""
# This regex will find '*' that are not preceded by a dot
pattern = r"(?<!\.)\*"
# Replace those '*' with '.*'
return re.sub(pattern, r".*", object_value)
def escape_backslash(object_value):
"""
Escape backslashes in the object_value
"""
# This regex will find '\' and replace it with '\\'
pattern = r"\\"
# Replace those '\' with '\\'
return re.sub(pattern, r"\\\\", object_value)
def replace_encoded_backslashes(object_value):
"""
Replace encoded backslashes with actual backslashes
"""
# This regex will find '\\u005c' and replace it with '\'
pattern = r"\\u005c"
# Replace those '\\u005c' with '\'
return re.sub(pattern, r"\\", object_value)
def replace_encoded_doublebackslashes(object_value):
"""
Replace encoded backslashes with double backslashes
"""
# This regex will find '\\u005c' and replace it with '\'
pattern = r"\\u005c"
# Replace those '\\u005c' with '\'
return re.sub(pattern, r"\\\\", object_value)
def replace_encoded_fourbackslashes(object_value):
"""
Replace encoded backslashes with four backslashes
"""
# This regex will find '\\u005c' and replace it with '\'
pattern = r"\\u005c"
# Replace those '\\u005c' with '\'
return re.sub(pattern, r"\\\\\\\\", object_value)
def check_tenant_id(value):
"""
Convert a time value with unit to seconds.
Supports formats:
- Integer (assumed to be seconds)
- String with unit suffix (e.g. "1h", "1d", "1w")
Returns the value in seconds as an integer.
"""
# trim the tenant_name
value = value.strip()
# make it lowercase
value = value.lower().replace(" ", "-")
# avoid ending with multiple underscores in the tenant id
value = re.sub(r"_{1,}", "_", value)
# replace any underscore with a hyphen
value = re.sub(r"_", "-", value)
# replace anything that is not a letter, number or hyphen with a hyphen
value = re.sub(r"[^a-zA-Z0-9-]", "-", value)
return value
def convert_time_to_seconds(time_value):
"""
Convert a time value with unit to seconds.
Supports formats:
- Integer (assumed to be seconds)
- String with unit suffix (e.g. "15m", "1h", "1d", "1w")
Returns the value in seconds as an integer.
"""
try:
# If it's already an integer, return it
if isinstance(time_value, int):
return time_value
# If it's a string, try to parse the unit
if isinstance(time_value, str):
# Remove any whitespace
time_value = time_value.strip()
# Check if it ends with a unit
if time_value.endswith("m"):
return int(float(time_value[:-1]) * 60) # minutes to seconds
elif time_value.endswith("h"):
return int(float(time_value[:-1]) * 3600) # hours to seconds
elif time_value.endswith("d"):
return int(float(time_value[:-1]) * 86400) # days to seconds
elif time_value.endswith("w"):
return int(float(time_value[:-1]) * 604800) # weeks to seconds
else:
# Try to convert to integer (assumed to be seconds)
return int(float(time_value))
# If we get here, try to convert to float then int
return int(float(time_value))
except (ValueError, TypeError):
raise ValueError(
f"Invalid time value format: {time_value}. Expected format: integer or string with unit suffix (m/h/d/w)"
)
def normalize_anomaly_reason(anomaly_reason):
"""
Normalizes the anomaly_reason field into a consistent list of strings.
This function handles various input formats for anomaly_reason, including:
- A single string with delimiters (pipe, newline, or comma)
- A list of strings, where each string might also contain delimiters
- None, "N/A", or other null-like values
It processes the input and returns a sorted list of unique, clean reason strings.
Args:
anomaly_reason (str, list, None): The input anomaly_reason to normalize.
Returns:
list: A sorted list of unique, non-empty reason strings. Returns an
empty list if no valid reasons are found.
"""
if not anomaly_reason:
return []
raw_reasons = []
# If the input is a list, recursively process each item
if isinstance(anomaly_reason, list):
for item in anomaly_reason:
raw_reasons.extend(normalize_anomaly_reason(item))
# If the input is a string, split it by common delimiters
elif isinstance(anomaly_reason, str):
# Ignore common null-like values
if anomaly_reason.strip().lower() in ("n/a", "none", "null", ""):
return []
# Split by pipe, newline, or comma
raw_reasons = re.split(r"[|\n,]", anomaly_reason)
# For any other type, we cannot process it
else:
return []
# Clean up the list:
# - Strip whitespace from each reason
# - Filter out any resulting empty or null-like strings
# - Use a set to get unique reasons, then convert back to a list and sort it
unique_reasons = {
reason.strip()
for reason in raw_reasons
if reason
and reason.strip()
and reason.strip().lower() not in ("n/a", "none", "null")
}
return sorted(list(unique_reasons))