# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. """ ITOA-8115: remove dependencies of SA-ITOA from SA-ITSI-Licensechecker. Manually copied to apps/SA-ITSI-Licensechecker/lib/ITOA/setup_logging.py If you change this file, please also update the copy. """ import inspect import logging import os import sys import threading from builtins import object from contextlib import contextmanager from future.utils import with_metaclass from time import strftime, time from uuid import uuid1 from splunk import setupSplunkLogger from splunk.Intersplunk import readResults from splunk.clilib.bundle_paths import make_splunkhome_path # On Windows System, suppress all logging exceptions if os.name == 'nt': logging.raiseExceptions = False LOG_DEFAULT_FMT = '%(asctime)s process:%(process)d thread:%(threadName)s %(levelname)s [%(name)s] [%(module)s:%(' \ 'lineno)d] [%(funcName)s] %(message)s' SEARCH_LOG_FILE = 'itsi_search.log' """ Please add new logger name and its log file information here, so we can have information at one place Logger Standards https://splunk.atlassian.net/wiki/spaces/PROD/pages/313671861854/ITSI+Logging+System+Refactoring """ class TZMillisLogFormatter(logging.Formatter): def formatTime(self, record, datefmt=None): if datefmt is not None: raise TypeError('This logging Formatter does not accept datefmt') created_time = self.converter(record.created) # Python logger has its own way to keep track for time for a log record, it goes extra mile to capture # additional dictionary value for millisecond portion for a log record creation timestamp (msecs). # We tried to use %f to format raw time, however, %f is not always available with python builds and at the time # of bug fix was not available with a python packaged in Splunk. # Solution: format time with all the available modifiers per Splunk's python and borrow millisecond portion # from logger, format it with a leading zeros (%03d) and inject the value in place of dummy placeholder (MILLIS) # to produce final version of time formatted string. Final format is made to match TIME_FORMAT in # SA_ITOA/default/props.conf, it's important to keep them in sync for correct parsing by Splunk indexers. formatted_time = strftime('%Y-%m-%d %H:%M:%S,MILLIS%z', created_time) s = formatted_time.replace('MILLIS', '%03d' % record.msecs) return s def setup_logging(logfile_name=None, logger_name=None, logger=None, level=logging.INFO, is_console_header=False, log_format=LOG_DEFAULT_FMT, is_propagate=False): """ Setup logging @param logfile_name: log file name @param logger_name: logger name (if logger specified then we ignore this argument) @param logger: logger object @param level: logging level @param is_console_header: set to true if console logging is required @param log_format: log message format @param is_propagate: set to true if you want to propagate log to higher level @return: logger """ if (logfile_name is None or logger_name is None) and logger is None: raise ValueError(("log_name or logger_name is not specified and logger object is not provided.")) if logger is None: # Logger is singleton so if logger is already defined it will return old handler logger = logging.getLogger(logger_name) logger = logger.logger if isinstance(logger, ItsiLogger) else logger # Save the handlers before overwriting logger loghandlers = logger.handlers # If handlers is already defined then do not create new handler, this way we can avoid file opening again # which is issue on windows see ITOA-2439 for more information # Now we are checking if we need create new handler(s) hasFileHandler = False hasConsoleHandler = False handlerFormat = None for handler in loghandlers: if isinstance(handler, logging.handlers.RotatingFileHandler): handlerFormat = handlerFormat if handlerFormat else handler.formatter hasFileHandler = True elif isinstance(handler, logging.StreamHandler): handlerFormat = handlerFormat if handlerFormat else handler.formatter hasConsoleHandler = True # If logger_name is None: will create a child logger with different properties from parent. # If the given logger_name is not equal to the existing logger's name, also will create a child logger if logger_name is None or logger.name != logger_name: # dot(.) in the two log names make the new logger is the child of the existing logger, # so new handlers added to the new one will not impact the old one. logger = logging.getLogger("%s.%s" % (logger.name, logger_name if logger_name else "sub")) logger.propagate = is_propagate # Prevent the log messages from being duplicated in the python.log file logger.setLevel(level) if not hasFileHandler: try: lockdir = make_splunkhome_path(['var', 'itsi', 'lock']) if not os.path.exists(os.path.dirname(lockdir)): os.mkdir(make_splunkhome_path(['var', 'itsi'])) os.mkdir(make_splunkhome_path(['var', 'itsi', 'lock'])) elif not os.path.exists(lockdir): os.mkdir(lockdir) except OSError as ose: # Swallow all "File exists" errors - another thread/process beat us to the punch if ose.errno != 17: raise logfile = logfile_name if os.path.basename(logfile_name) == logfile_name: logfile = make_splunkhome_path(['var', 'log', 'splunk', logfile_name]) # Note that there are still some issues with windows here, going to make it so that we dont file_handler = logging.handlers.RotatingFileHandler(logfile, maxBytes=2500000, backupCount=5) file_handler.setFormatter(handlerFormat if handlerFormat else TZMillisLogFormatter(log_format)) logger.addHandler(file_handler) if is_console_header and not hasConsoleHandler: console_handler = logging.StreamHandler() console_handler.setFormatter(handlerFormat if handlerFormat else TZMillisLogFormatter(log_format)) logger.addHandler(console_handler) # Read logging level information from log.cfg so it will overwrite log # Note if logger level is specified on that file then it will overwrite log level LOGGING_DEFAULT_CONFIG_FILE = make_splunkhome_path(['etc', 'log.cfg']) LOGGING_LOCAL_CONFIG_FILE = make_splunkhome_path(['etc', 'log-local.cfg']) LOGGING_STANZA_NAME = 'python' setupSplunkLogger( logger, LOGGING_DEFAULT_CONFIG_FILE, LOGGING_LOCAL_CONFIG_FILE, LOGGING_STANZA_NAME, verbose=False ) return logger class InstrumentCall(object): """ Instrument a call - i.e. see how long this thing takes and potentially trace through it in order to gain understanding of how long it takes for either 1) This particular method to run or 2) What other things is this method doing on the inside. If you just want to put transaction tracing on a method - use @InstrumentCall(logger) to decorate your method If you want to do more detailed tracing, use the push and the pop methods to instrument what you want to trace through. Use the transaction_id returned (recommended) by the first push or define your own (doable, but be careful of duplicate values in multi-threaded environments """ def __init__(self, logger, loginfo=True): """ Create the instrument call object (half decorator, half object) @param loginfo: A flag indicating that we want to log at info (vs debug) @param logger: The logger to log to """ self.logger = logger self.loginfo = loginfo self.threadsafe_storage = threading.local() def __call__(self, f): """ Decorator to instrument object methods @InstrumentCall(logger) def add(self, a, b): :param f: (func) Function to wrap :return: (func) Wrapped function """ def wrapper(decorated_self, *args, **kwargs): method_name = None try: # Separate assignments for separate possible errors method_name = f.__name__ method_name = "%s.%s" % (decorated_self.__class__.__name__, method_name) except Exception: if not method_name: method_name = str(f) # Pull out transaction ID transaction_id = kwargs.get("transaction_id") # Check *args for transaction_id if not transaction_id: for i, param in enumerate(inspect.signature(f).parameters): if param == "transaction_id": transaction_id = args[i - 1] # self -> decorated_self break if not transaction_id: transaction_id = getattr(getattr(decorated_self, 'itsi_muh', None), 'transaction_id', None) or getattr(decorated_self, 'transaction_id', None) transaction_id = self.push(method_name, transaction_id=transaction_id) retval = f(decorated_self, *args, **kwargs) self.pop(method_name, transaction_id) return retval return wrapper @contextmanager def track(self, method, transaction_id=None, owner=None, metric_info=None): """ Context manager usage :param method: (str) Method name ("metricName" in instrumentation) :param transaction_id: (str) Transaction ID for tying together different function calls :param owner: (str) Owner name :param metric_info: (dict) Key-value pairs to log additional information Note: Don't use "@@@" or keys with spaces to work with telemetry saved searches correctly :yield: Transaction ID """ transaction_id = self.push(method, transaction_id=transaction_id, owner=owner, metric_info=metric_info) try: yield transaction_id finally: self.pop(method, transaction_id=transaction_id, metric_info=metric_info) def push(self, method, transaction_id=None, owner=None, metric_info=None): """ Push based on the passed in transaction ID :param method: (str) Method name ("metricName" in instrumentation) :param transaction_id: (str) Transaction ID for tying together different function calls :param owner: (str) Owner name :param metric_info: (dict) Key-value pairs to log additional information Note: Don't use "@@@" or keys with spaces to work with telemetry saved searches correctly :return: (str, str) Transaction ID (can be shared among calls) and call ID (unique to push) """ if self.loginfo: log_method = self.logger.info else: log_method = self.logger.debug if transaction_id is None: transaction_id = uuid1().hex if owner is None: owner = "None" unified_id = "%s:%s" % (transaction_id, method) start_time = time() if "start_times" not in dir(self.threadsafe_storage): self.threadsafe_storage.start_times = {} self.threadsafe_storage.owners = {} self.threadsafe_storage.start_times[unified_id] = start_time self.threadsafe_storage.owners[unified_id] = owner metric_info_str = "" if metric_info: for k, v in metric_info.items(): metric_info_str += " metric_info.%s=%s" % (k, "\"%s\"" % v if type(v) is str else v) log_method("Invoked tid=%s method=%s start_time=%s owner='%s'%s", transaction_id, method, start_time, owner, metric_info_str) return transaction_id def pop(self, method, transaction_id, metric_info=None): """ Pop based on the transaction ID and call ID @param method: (str) Method name ("metricName" in instrumentation) @param transaction_id: (str) Transaction ID for tying together different function calls @param metric_info: (dict) Key-value pairs to log additional information Note: Don't use "@@@" or keys with spaces to work with telemetry saved searches correctly """ end_time = time() if self.loginfo: log_method = self.logger.info else: log_method = self.logger.debug unified_id = "%s:%s" % (transaction_id, method) start_time = self.threadsafe_storage.start_times.get(unified_id) if not start_time: self.logger.error("Timing information could not be determined tid=%s", unified_id) return owner = self.threadsafe_storage.owners.get(unified_id) if not owner: self.logger.error("Ownership information could not be determined tid=%s", unified_id) return metric_info_str = "" if metric_info: for k, v in metric_info.items(): metric_info_str += " metric_info.%s=%s" % (k, "\"%s\"" % v if type(v) is str else v) transaction_time = end_time - start_time log_method("Finished tid=%s method=%s start_time=%s end_time=%s transaction_time=%s owner='%s'%s", transaction_id, method, start_time, end_time, transaction_time, owner, metric_info_str) # In longer runs, we would run into a memory problem, although unlikely, its easier to deal with now del self.threadsafe_storage.start_times[unified_id] del self.threadsafe_storage.owners[unified_id] UNTRACKED_LOG = "itsi.untracked" UNTRACKED_LOG_FILE = "itsi_untracked.log" def currentframe(): """ Set the current frame so Python logging can find the callers of ItsiLogger functions. """ return sys._getframe(3) logging._srcfile = os.path.normcase(currentframe.__code__.co_filename) """ Singleton Meta Class for ITSI logging To be more precisely, this is singleton + proxy approach that allow dependencies initialize logger first and use later. As long as process entry point initialize it with logger name and logger file before sub module use it, sub module can still log to the file that entry point specified. It doesn't hurt for the sequence of getting/initializing the logger. It is only hurt when sub module use it before entry point initialize the logger. In this 'bad' case(that may cause log conflict in multi-process environment), the logs go to UNTRACKED_LOG_FILE. It is developer's responsibility that NOT logging to UNTRACKED_LOG_FILE. """ class Singleton(type): _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) log = cls._instances[cls] if (len(args) >= 1 or 'logger_name' in kwargs) \ and log.logger.name == UNTRACKED_LOG: # Reset logger from entry point for h in log.logger.handlers: # Make sure the old handlers are completely closed. h.close() log.logger.removeHandler(h) old_logger = log.logger try: log.logger = setup_logging(*args, **kwargs) except IOError as e: old_logger.exception(e) log.logger = old_logger return log """ A wrapper class for logger by calling setup_logging, and it is a singleton class """ class ItsiLogger(metaclass=Singleton): def __init__(self, logfile_name=UNTRACKED_LOG_FILE, logger_name=UNTRACKED_LOG, level=logging.INFO, is_console_header=False, log_format=LOG_DEFAULT_FMT): """ @param logfile_name: the log file name @param logger_name: the name of the logger. This class is singleton and initialized at the entry point of process. And the logger name will be set by the entry point If the entry point does't initialize it, the first python module that constructs this object will set the name of this logger. @param level: logging level @param is_console_header: set to true if console logging is required @param log_format: log message format If the non-entry point of the process call the constructor, it will return the already created instance. But entry points should provide logfile_name and logger_name, otherwise, all logs belong to this process will log to UNTRACKED_LOG_FILE which should NEVER happen. Since UNTRACKED_LOG_FILE is a rescue approach to prevent ITSI from stop running. """ self.logfile_name = logfile_name self.logger_name = logger_name self.logger = setup_logging(logfile_name, logger_name, level=level, is_console_header=is_console_header, log_format=log_format) self.props = {} self.props_string = '' def __process_msg(self, msg, kwargs): props_msg = '' if 'props' in kwargs and type(kwargs['props']) is dict: props_msg = ' ' + ' '.join(k + "=" + str(kwargs['props'][k]) for k in kwargs['props']) del kwargs['props'] if self.props_string or props_msg: msg = str(msg) msg += self.props_string + props_msg return msg def addContext(self, props): self.props.update(props) self.props_string = ' ' + ' '.join(k + "=" + str(self.props[k]) for k in self.props) def cleanUpContext(self): self.props = {} self.props_string = '' def exception(self, msg, *args, **kwargs): msg = self.__process_msg(msg, kwargs) self.logger.exception(msg, *args, **kwargs) def error(self, msg, *args, **kwargs): msg = self.__process_msg(msg, kwargs) self.logger.error(msg, *args, **kwargs) def warning(self, msg, *args, **kwargs): msg = self.__process_msg(msg, kwargs) self.logger.warning(msg, *args, **kwargs) warn = warning def info(self, msg, *args, **kwargs): msg = self.__process_msg(msg, kwargs) self.logger.info(msg, *args, **kwargs) def debug(self, msg, *args, **kwargs): msg = self.__process_msg(msg, kwargs) self.logger.debug(msg, *args, **kwargs) def setLevel(self, level): self.logger.setLevel(level) def addFilter(self, filter): self.logger.addFilter(filter) def getLogFilePath(self): return make_splunkhome_path(['var', 'log', 'splunk', self.logfile_name]) """ Default Logger """ logger = ItsiLogger() def getLogger4ModInput(input_config, logger_name=None): """ Function to get logger for Modular Inputs @param input_config: The stanza config section @param logger_name: The logger name @return: The singleton logger """ # Use module name as stanza name if it is empty stanza_name = __name__ + '://' + __name__ if input_config is None or len(input_config) == 0 else list(input_config)[ 0] sourcetype, name = stanza_name.split('://') return ItsiLogger("%s-%s.log" % (sourcetype if sourcetype.startswith('itsi_') else 'itsi_' + sourcetype, name), logger_name if logger_name else "itsi." + sourcetype[5:] if sourcetype.startswith('itsi_') else sourcetype) def getLogger4SearchCmd(logger_name=None, level=logging.INFO, is_console_header=False, has_header=True, return_all=False): """ Function to get logger for search command, the logger will write to search.log. It also returns the settings and record from context if So no target log file is required. @param logger_name: The logger name @param level: The level of the logger @param is_console_header: Do we need console output for this logger? @param has_header: Do we need skip headers while reading settings from input? @param return_all: Do we need return settings and record? @return: Tuple of 3 elements: the singleton logger, the settings, and the record returned from readResults. """ settings = dict() record = readResults(settings=settings, has_header=has_header) if logger_name is None: exec_file = os.path.basename(sys.argv[0])[:-3] logger_name = exec_file[8:] if exec_file.startswith('command_') else exec_file log_file = make_splunkhome_path(['var', 'run', 'splunk', 'dispatch', settings['sid'], SEARCH_LOG_FILE]) logger = ItsiLogger(log_file, logger_name, level=level, is_console_header=is_console_header) if return_all: return (logger, settings, record) return logger def formatLoggerNameAndFile(logger_name, logger_file): """ Function to get logger name and logger file for given parameters @param logger_name: The logger name @param logger_file: The logger_file @return: Tuple of 2 elements: the prefixed logger_name, the prefixed logger_file """ exec_file = os.path.basename(sys.argv[0])[:-3] if exec_file == 'root': # REST implementation files are started by root.py, # but we want get the implementation file from module_file exec_file = 'itsi_services' if logger_name is None: logger_name = "itsi." + exec_file[5:] if exec_file.startswith('itsi_') else exec_file if logger_file is None: logger_file = (exec_file if exec_file.startswith('itsi_') else 'itsi_' + exec_file) + ".log" return logger_name, logger_file def getLogger(logger_name=None, logger_file=None, level=logging.INFO, is_console_header=False, log_format=LOG_DEFAULT_FMT): """ Function to get logger for other Python process entry files. @param logger_name: The logger name @param logger_file: The logger_file @param level: The level of the logger @param is_console_header: Do we need console output for this logger? @param log_format: The format of the logger @return: The singleton logger """ log_name, log_file = formatLoggerNameAndFile(logger_name, logger_file) return ItsiLogger(log_file, log_name, level=level, is_console_header=is_console_header, log_format=log_format)