# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved. import json from ITOA.itoa_exceptions import ItoaValidationError MONGODB_QUERY_OPTIONS = {"ignoreCase": True} class CustomException(Exception): pass class CommonUtils: def __init__(self, logger): self.logger = logger def get_query_from_request_args(self, query): """ Converts UI query to mongoDB format :param query query string from UI: :return: """ if not query: return '' else: try: return json.dumps(self.build_mongodb_query(json.loads(query))) # Throws ArgalidationException when we provide invalid JSON for mongodb query, a cleaner # exception message than default ValueError returned except ValueError: raise ItoaValidationError('Invalid JSON supplied as part of query!', self.logger) def build_mongodb_query(self, filters, options=None): """ Ex1: for a input {"title": ["a*"]} this function would return {"title": {"$regex": "a*", , "$options": "i"}} Ex2: for a input {"title": ["a*", "b"]} returns: {"$or": [{"title": {"$regex": "a*", "$options": "i"}}, {"title": {"$regex": "b", , "$options": "i"}} ]} Ex3: for a input {"title": ["a*", "b"], "os": ["windows"]} returns: {"$and": [ {"$or": [{"title": {"$regex": "a*", "$options": "i"}}, {"$regex": "b", , "$options": "i"}} ]}, {"os": {"$regex": "windows", , "$options": "i"}}} ]} NOTE: This code needs to be updated when https://jira.splunk.com/browse/PBL-9076 is implemented. :param filters dictionary, filters object passed by the UI :param options dictionary of supported options :return mongoDB format query dictionary: """ # if filter is not a dict or options if passed in is not a dict a error will be thrown if not isinstance(filters, dict) or (options is not None and not isinstance(options, dict)): raise ItoaValidationError('Filter/options are expected to be a dict', self.logger) # if filters is empty object return as no query constructions required if not bool(filters): return filters options = options if options else MONGODB_QUERY_OPTIONS mongo_options = self._construct_mongo_options(options) sub_queries = [self._construct_query_for(key, value, mongo_options) for key, value in list(filters.items())] # if number of sub-queries is 1 return else wrap it around a "$and" return sub_queries[0] if len(sub_queries) == 1 else {"$and": sub_queries} def _construct_query_for(self, key, value, options): """ If values is a list it would return {"$or": [{"key":"a"}, {"key": "b"}]} else it would return {"key": "value"} :return: """ if not (isinstance(value, list) or isinstance(value, str)): raise ItoaValidationError('Value needs to be a string or list', self.logger) item = {} if isinstance(value, list): item['$or'] = [{key: self.get_regex_search_string(v, options)} for v in value] else: item[key] = self.get_regex_search_string(value, options) return item def get_regex_search_string(self, value, options): """ - It is not a good idea to use regex for all searches , instead we should start storing data in lowercase always. """ if '*' in value: # need to do this to handle a filter like host:m*, .* will allow chars after the match value = value.replace('*', '.*') return {"$regex": '^%s$' % value, "$options": options} def _construct_mongo_options(self, options): """ :param options dictionary of options for regex: :return a string for mongo query: """ options_str = "" if "ignoreCase" in options and options['ignoreCase']: options_str += "i" return options_str