You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
110 lines
4.0 KiB
110 lines
4.0 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
import json
|
|
|
|
from ITOA.itoa_exceptions import ItoaValidationError
|
|
|
|
|
|
MONGODB_QUERY_OPTIONS = {"ignoreCase": True}
|
|
|
|
|
|
class CustomException(Exception):
|
|
pass
|
|
|
|
|
|
class CommonUtils:
|
|
def __init__(self, logger):
|
|
self.logger = logger
|
|
|
|
def get_query_from_request_args(self, query):
|
|
"""
|
|
Converts UI query to mongoDB format
|
|
:param query query string from UI:
|
|
:return:
|
|
"""
|
|
if not query:
|
|
return ''
|
|
else:
|
|
try:
|
|
return json.dumps(self.build_mongodb_query(json.loads(query)))
|
|
# Throws ArgalidationException when we provide invalid JSON for mongodb query, a cleaner
|
|
# exception message than default ValueError returned
|
|
except ValueError:
|
|
raise ItoaValidationError('Invalid JSON supplied as part of query!', self.logger)
|
|
|
|
def build_mongodb_query(self, filters, options=None):
|
|
"""
|
|
Ex1: for a input {"title": ["a*"]}
|
|
this function would return {"title": {"$regex": "a*", , "$options": "i"}}
|
|
|
|
Ex2: for a input {"title": ["a*", "b"]}
|
|
returns: {"$or": [{"title": {"$regex": "a*", "$options": "i"}}, {"title": {"$regex": "b", , "$options": "i"}} ]}
|
|
|
|
Ex3: for a input {"title": ["a*", "b"], "os": ["windows"]}
|
|
returns: {"$and": [
|
|
{"$or": [{"title": {"$regex": "a*", "$options": "i"}}, {"$regex": "b", , "$options": "i"}} ]},
|
|
{"os": {"$regex": "windows", , "$options": "i"}}}
|
|
]}
|
|
|
|
NOTE: This code needs to be updated when https://jira.splunk.com/browse/PBL-9076 is implemented.
|
|
|
|
:param filters dictionary, filters object passed by the UI
|
|
:param options dictionary of supported options
|
|
:return mongoDB format query dictionary:
|
|
"""
|
|
|
|
# if filter is not a dict or options if passed in is not a dict a error will be thrown
|
|
if not isinstance(filters, dict) or (options is not None and not isinstance(options, dict)):
|
|
raise ItoaValidationError('Filter/options are expected to be a dict', self.logger)
|
|
|
|
# if filters is empty object return as no query constructions required
|
|
if not bool(filters):
|
|
return filters
|
|
|
|
options = options if options else MONGODB_QUERY_OPTIONS
|
|
mongo_options = self._construct_mongo_options(options)
|
|
sub_queries = [self._construct_query_for(key, value, mongo_options) for key, value in list(filters.items())]
|
|
|
|
# if number of sub-queries is 1 return else wrap it around a "$and"
|
|
return sub_queries[0] if len(sub_queries) == 1 else {"$and": sub_queries}
|
|
|
|
def _construct_query_for(self, key, value, options):
|
|
"""
|
|
If values is a list it would return
|
|
{"$or": [{"key":"a"}, {"key": "b"}]}
|
|
|
|
else it would return
|
|
{"key": "value"}
|
|
:return:
|
|
"""
|
|
if not (isinstance(value, list) or isinstance(value, str)):
|
|
raise ItoaValidationError('Value needs to be a string or list', self.logger)
|
|
|
|
item = {}
|
|
if isinstance(value, list):
|
|
item['$or'] = [{key: self.get_regex_search_string(v, options)} for v in value]
|
|
else:
|
|
item[key] = self.get_regex_search_string(value, options)
|
|
return item
|
|
|
|
def get_regex_search_string(self, value, options):
|
|
"""
|
|
- It is not a good idea to use regex for all searches , instead we should start storing
|
|
data in lowercase always.
|
|
"""
|
|
if '*' in value:
|
|
# need to do this to handle a filter like host:m*, .* will allow chars after the match
|
|
value = value.replace('*', '.*')
|
|
|
|
return {"$regex": '^%s$' % value, "$options": options}
|
|
|
|
def _construct_mongo_options(self, options):
|
|
"""
|
|
:param options dictionary of options for regex:
|
|
:return a string for mongo query:
|
|
"""
|
|
options_str = ""
|
|
if "ignoreCase" in options and options['ignoreCase']:
|
|
options_str += "i"
|
|
|
|
return options_str
|