You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

192 lines
9.9 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import json
import sys
import nats
import splunk.rest as rest
from splunk.clilib.bundle_paths import make_splunkhome_path
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib']))
sys.path.append(make_splunkhome_path(['etc', 'apps', 'SA-ITOA', 'lib', 'SA_ITOA_app_common']))
from SA_ITOA_app_common.solnlib.conf_manager import ConfManager
from ITOA.itoa_common import get_itsi_event_management_nats_certificate_value, get_nats_credentials, get_peers, is_cloud
import ssl
import socket
class NatsEventPublisher(object):
"""
Push Event to NATS Server using instance of this class
"""
def __init__(self, session_key, logger):
"""
Initialize NATS publisher
@return:
"""
self.session_key = session_key
self.logger = logger
async def add_or_update_stream(self, js, stream_config):
stream_name = stream_config.name
# Fetch the current stream configuration
try:
current_config = await js.stream_info(stream_name)
except nats.js.errors.NotFoundError:
self.logger.info('No existing stream')
current_config = None
except Exception as e:
self.logger.error(
'Error occurred while fetching the current stream info %s', e)
raise Exception('Error occurred while fetching the current stream info')
if current_config is None:
# Create the stream
await js.add_stream(stream_config)
self.logger.info(f"Stream '{stream_name}' created.")
else:
# Compare configurations using dictionaries
current_config_dict = {
'retention': current_config.as_dict().get('config', {}).get('retention'),
'max_age': current_config.as_dict().get('config', {}).get('max_age'),
'num_replicas': current_config.as_dict().get('config', {}).get('num_replicas')
}
new_config_dict = {
'retention': stream_config.as_dict().get('retention'),
'max_age': stream_config.as_dict().get('max_age'),
'num_replicas': stream_config.as_dict().get('num_replicas')
}
if current_config_dict != new_config_dict:
# Update the stream if needed
await js.update_stream(stream_config)
self.logger.info(f"Stream '{stream_name}' updated.")
else:
self.logger.info(f"Stream '{stream_name}' is already up-to-date.")
def calculate_replicas_number(self, nats_settings):
try:
response, content = rest.simpleRequest('/services/shcluster/status',
sessionKey=self.session_key,
getargs={'output_mode': 'json'},
raiseAllErrors=False)
if response and response.status != 200:
# no peers or shc is not supported
return 1
content = json.loads(content)
peers_list = content['entry'][0]['content']['peers']
if len(peers_list) >= 3:
return int(nats_settings.get('stream_replication_factor', 3))
else:
return len(peers_list)
except Exception as e:
self.logger.error('Failed to get the shc info %s', e)
return 1
async def setup_NATS(self):
ssl_ctx = None
try:
cfm = ConfManager(self.session_key, 'SA-ITOA')
conf = cfm.get_conf('itsi_nats')
nats_settings = conf.get('nats_settings')
# get nats configurations
require_tls_client_cert_cloud = int(nats_settings.get('require_tls_client_cert_cloud', 1))
require_tls_client_cert_on_prem = int(nats_settings.get('require_tls_client_cert_on_prem', 0))
is_cloud_stack = is_cloud(self.logger, self.session_key)
tls_enabled = (is_cloud_stack is True and require_tls_client_cert_cloud == 1) or (is_cloud_stack is False and require_tls_client_cert_on_prem == 1)
auth_enabled = int(nats_settings.get('require_auth', 1))
retention_max_age = int(nats_settings.get('retention_max_age', 3600))
nats_server_connect_time = int(nats_settings.get('nats_server_connect_time', 5))
nats_max_reconnect_attempts = int(nats_settings.get('nats_max_reconnect_attempts', 3))
nats_reconnect_time_wait = int(nats_settings.get('nats_reconnect_time_wait', 5))
replicas = self.calculate_replicas_number(nats_settings)
nats_servers = nats_settings.get('nats_servers', '127.0.0.1:4222')
list_of_nats_servers = nats_servers.split(',')
host_name = socket.gethostname()
try:
nats_auto_discover_publisher_nodes = int(nats_settings.get('nats_auto_discover_publisher_nodes', 1))
if(nats_auto_discover_publisher_nodes == 1):
peers = get_peers(self.logger, self.session_key , host_name, True)
if peers:
list_of_nats_servers = []
for peer in peers:
peer_nats_server_url = peer + ":4222"
list_of_nats_servers.append(peer_nats_server_url)
except Exception as e1:
self.logger.exception('Exception while fetching peer nodes : %s' % e1.args[0])
list_of_nats_servers = nats_servers.split(',')
# get nats credentials from storage/passwords
passwords_uri = "/services/storage/passwords/nats-admin?output_mode=json"
credentials = get_nats_credentials(self.session_key, passwords_uri, auth_enabled)
if credentials:
username = credentials['clear_password'].split(':')[0]
password = credentials['clear_password'].split(':')[1]
# Insert credentials before each server address
prefixed_nats_servers = [f'nats://{username}:{password}@{server}' for server in list_of_nats_servers]
else:
# simply use the nats server url
prefixed_nats_servers = [f'nats://{server}' for server in list_of_nats_servers]
self.logger.info('NATS configuration: tls_enabled=%s, retention_max_age=%s, nats_servers=%s, '
'server_connect_time=%s, max_reconnect_attempts=%s, nats_reconnect_time_wait=%s',
tls_enabled, retention_max_age, nats_servers, nats_server_connect_time,
nats_max_reconnect_attempts, nats_reconnect_time_wait)
if tls_enabled == 1:
ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
cert_directory = get_itsi_event_management_nats_certificate_value(self.session_key, 'nats_queue',
'cert_directory')
client_cert = get_itsi_event_management_nats_certificate_value(self.session_key, 'nats_queue',
'client_cert')
client_cert_key = get_itsi_event_management_nats_certificate_value(self.session_key, 'nats_queue',
'client_cert_key')
client_cert_path = make_splunkhome_path([cert_directory, client_cert])
client_cert_key_path = make_splunkhome_path([cert_directory, client_cert_key])
self.logger.info('SSL Client certificate file location: %s, SSL Client certificate key file location: %s', client_cert_path, client_cert_key_path)
ssl_ctx.load_cert_chain(certfile=client_cert_path, keyfile=client_cert_key_path)
except Exception as e:
self.logger.exception('Failed to setup nats client certificate : %s' % e.args[0])
raise Exception('Failed to setup nats client certificate.')
try:
if ssl_ctx is None:
self.nc = await nats.connect(servers=prefixed_nats_servers, connect_timeout=nats_server_connect_time,
max_reconnect_attempts=nats_max_reconnect_attempts,
reconnect_time_wait=nats_reconnect_time_wait)
else:
self.nc = await nats.connect(servers=prefixed_nats_servers, connect_timeout=nats_server_connect_time,
max_reconnect_attempts=nats_max_reconnect_attempts,
reconnect_time_wait=nats_reconnect_time_wait, tls=ssl_ctx)
self.js = self.nc.jetstream()
stream_config = nats.js.api.StreamConfig(
name='itsi_indexes',
subjects=['itsi_tracked_alerts'],
retention=nats.js.api.RetentionPolicy.LIMITS,
max_age=retention_max_age,
num_replicas=replicas, # Number of replicas
)
# retention: max_msgs=1000000, max_bytes=20000, max_age=3600
# for now, only set the max age to 1 hour
# and set replicas to the number in itsi_nats.conf if it is shc
await self.add_or_update_stream(self.js, stream_config)
except Exception as e:
self.logger.exception('Failed to connect nats-server : %s' % e.args[0])
raise Exception('Failed to connect nats-server.')
async def push_events_to_nats(self, events):
try:
await self.setup_NATS()
for event in events:
await self.js.publish('itsi_tracked_alerts', json.dumps(event).encode())
# Close NATS connection
await self.nc.close()
except Exception as e:
self.logger.exception('Failed to connect nats-server : %s' % e.args[0])
raise Exception('Failed to connect nats-server.')