You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
108 lines
3.9 KiB
108 lines
3.9 KiB
#!/usr/bin/env python
|
|
# coding=utf-8
|
|
#
|
|
# Copyright (C) 2009-2021 Splunk Inc. All Rights Reserved.
|
|
|
|
from __future__ import absolute_import, division, print_function, unicode_literals
|
|
import default
|
|
import app
|
|
|
|
from splunklib.searchcommands import dispatch, GeneratingCommand, Configuration, Option, validators
|
|
from collections import Iterable
|
|
from itertools import chain
|
|
from json import JSONEncoder
|
|
from ldap3 import Connection, BASE
|
|
from ldap3.core.exceptions import LDAPException
|
|
from ldapsearch import LdapSearchCommand
|
|
from time import time
|
|
|
|
|
|
@Configuration(retainsevents=True)
|
|
class LdapTestConnectionCommand(GeneratingCommand):
|
|
""" Tests the connection to the directory service for a domain.
|
|
|
|
This command tests the connection to each of the hosts servicing an LDAP directory. It must be placed at the
|
|
beginning of a search pipeline:
|
|
|
|
.. code-block:: text
|
|
| ldaptestconnection domain=splunk.com
|
|
|
|
"""
|
|
domain = Option(
|
|
doc=''' Specifies the name of the configuration stanza representing the LDAP or Active Directory domain
|
|
connection to test.
|
|
''',
|
|
default='default')
|
|
|
|
debug = Option(
|
|
doc=''' True, if the logging_level should be set to DEBUG; otherwise False.
|
|
**Default:** The current value of logging_level.
|
|
''',
|
|
default=False, validate=validators.Boolean())
|
|
|
|
def generate(self):
|
|
"""
|
|
:return: `None`.
|
|
|
|
"""
|
|
self.logger.debug('Command = %s', self)
|
|
configuration = app.Configuration(self)
|
|
|
|
encoder = JSONEncoder(ensure_ascii=False, separators=(',', ':'))
|
|
time_stamp = time()
|
|
serial_number = 0
|
|
|
|
servers = configuration.server if isinstance(configuration.server, Iterable) else (configuration.server,)
|
|
search_base = configuration.basedn
|
|
search_filter = '(objectClass=*)'
|
|
search_scope = BASE
|
|
attribute_names = 'distinguishedName',
|
|
|
|
records = []
|
|
errors = []
|
|
|
|
for server in servers:
|
|
self.logger.debug('Testing the connection to %s', server.name)
|
|
try:
|
|
with Connection(
|
|
server,
|
|
read_only=True,
|
|
raise_exceptions=True,
|
|
user=configuration.credentials.username,
|
|
password=configuration.credentials.password) as connection:
|
|
|
|
# LDAP Guarantee: There's one and only one response to our query (proof left as an exercise)
|
|
|
|
if connection.search(search_base, search_filter, search_scope, attributes=attribute_names):
|
|
response = connection.response[0]
|
|
attributes = app.get_attributes(self, response)
|
|
records.append(
|
|
LdapSearchCommand._record(
|
|
serial_number, time_stamp, server.host, response['dn'], attributes, attribute_names,
|
|
encoder))
|
|
else:
|
|
message = 'The directory serviced at {0} contains no entry for {1}'.format(
|
|
server.name, search_base)
|
|
errors.append((server.host, message))
|
|
pass
|
|
|
|
except LDAPException as error:
|
|
message = 'Could not access the directory service at {0}: {1}'.format(
|
|
server.name, app.get_ldap_error_message(error, configuration))
|
|
errors.append((server.host, message))
|
|
|
|
serial_number += 1
|
|
|
|
if errors:
|
|
message = ' # host: '.join(chain(' ', [
|
|
'{0}: {1}'.format(host, message.replace(u'\x00', '')) for host, message in errors]))
|
|
self.error_exit(ValueError(message), message)
|
|
|
|
for record in records:
|
|
yield record
|
|
|
|
return
|
|
|
|
|
|
dispatch(LdapTestConnectionCommand, module_name=__name__)
|