You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
118 lines
5.7 KiB
118 lines
5.7 KiB
|
|
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
|
|
import json
|
|
import requests
|
|
from requests.status_codes import codes
|
|
from .constants import NOTABLE_EVENT_ENDPOINT, DEFAULT_NOTABLE_EVENT_BATCH_SIZE, NEAPS_ENDPOINT, DEFAULT_MAX_RETRY, DEFAULT_RETRY_INTERVAL
|
|
from ITOA.itoa_common import get_itsi_event_management_conf_field_value
|
|
import gzip
|
|
import time
|
|
|
|
|
|
class IngestService:
|
|
"""Ingest Service.
|
|
"""
|
|
def __init__(self, logger, session_key, tenant_config, tenant_scope_scs_token):
|
|
"""
|
|
Args:
|
|
logger: logging instance
|
|
tenant_config: Instance of TenantConfig
|
|
tenant_scope_scs_token: tenant scope SCS token
|
|
"""
|
|
self.tenant_scope_scs_token = tenant_scope_scs_token
|
|
self.logger = logger
|
|
self.session_key = session_key
|
|
self.tenant_config = tenant_config
|
|
|
|
def get_endpoint(self, endpoint):
|
|
base_url = self.tenant_config.get_tenant_base_legacy_host_url()
|
|
tenant_name = self.tenant_config.get_tenant_name()
|
|
if not base_url:
|
|
self.logger.error(f"Error getting tenant_base_url to make SCS REST API. tenant={tenant_name}")
|
|
return
|
|
return 'https://' + base_url + '/' + tenant_name + endpoint
|
|
|
|
def send_events(self, events):
|
|
"""Push events to ingest service.
|
|
"""
|
|
create_notable_event_endpoint = self.get_endpoint(NOTABLE_EVENT_ENDPOINT)
|
|
if create_notable_event_endpoint is None:
|
|
return
|
|
tenant_name = self.tenant_config.get_tenant_name()
|
|
notable_events_batch_size = get_itsi_event_management_conf_field_value(self.session_key, 'ingest_service', 'notable_events_batch_size')
|
|
if notable_events_batch_size is None:
|
|
notable_events_batch_size = DEFAULT_NOTABLE_EVENT_BATCH_SIZE
|
|
try:
|
|
self.logger.info("Pushing events to High Scale EA for tenant: %s, total events=%s" % (tenant_name, len(events)))
|
|
for i in range(0, len(events), notable_events_batch_size):
|
|
batch_events = events[i:i + notable_events_batch_size]
|
|
self.logger.info("Pushing events to High Scale EA for tenant in bacthes : %s, total events=%s" % (tenant_name, len(batch_events)))
|
|
response, content = self._send_data(create_notable_event_endpoint, batch_events)
|
|
if response.status_code == codes.ok:
|
|
self.logger.info("Submitted events for High Scale EA. tenant=%s, result=%s, status=%s" % (tenant_name, content, response.status_code))
|
|
else:
|
|
self.logger.error("Error pushing event to Ingest Service. tenant=%s, status=%s, error=%s" % (tenant_name, response.status_code, content))
|
|
return False
|
|
except Exception as e:
|
|
self.logger.exception('Exception occurred while sending events to ingest service : %s' % e)
|
|
return False
|
|
return True
|
|
|
|
def send_neaps(self, policies):
|
|
"""Push NEAP's to ingest service.
|
|
"""
|
|
create_neaps_endpoint = self.get_endpoint(NEAPS_ENDPOINT)
|
|
if create_neaps_endpoint is None:
|
|
return
|
|
tenant_name = self.tenant_config.get_tenant_name()
|
|
try:
|
|
self.logger.info("Pushing NEAP's to High Scale EA for tenant : %s, total policies=%s" % (tenant_name, len(policies)))
|
|
response, content = self._send_data(create_neaps_endpoint, policies)
|
|
if response.status_code == codes.ok:
|
|
self.logger.info("Submitted NEAP's for High Scale EA. tenant=%s, result=%s, status=%s" % (tenant_name, content, response.status_code))
|
|
return True
|
|
else:
|
|
self.logger.error("Error pushing NEAP's to Ingest Service. tenant=%s, status=%s, error=%s" % (tenant_name, response.status_code, content))
|
|
return False
|
|
except Exception as e:
|
|
self.logger.exception("Exception occurred while sending NEAP's to ingest service : %s" % e)
|
|
return False
|
|
|
|
def _send_data(self, endpoint, payload):
|
|
"""Create data in ingest service.
|
|
"""
|
|
headers = {
|
|
"Authorization": f"Bearer {self.tenant_scope_scs_token}",
|
|
"Accept-Encoding": "gzip", "Content-Encoding": "gzip",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
compressed_payload = gzip.compress(bytes(json.dumps(payload), 'utf-8'))
|
|
# Define the maximum number of retry attempts
|
|
max_retries = get_itsi_event_management_conf_field_value(self.session_key, 'ingest_service', 'max_retries')
|
|
if max_retries is None:
|
|
max_retries = DEFAULT_MAX_RETRY
|
|
# Define the interval (in seconds) between retries
|
|
retry_interval = get_itsi_event_management_conf_field_value(self.session_key, 'ingest_service', 'retry_interval')
|
|
if retry_interval is None:
|
|
retry_interval = DEFAULT_RETRY_INTERVAL
|
|
for retry in range(max_retries):
|
|
try:
|
|
response = requests.post(
|
|
endpoint,
|
|
headers=headers,
|
|
data=compressed_payload
|
|
)
|
|
# Check the HTTP status code
|
|
if response.status_code == 200:
|
|
return response, response.content
|
|
else:
|
|
self.logger.error(f"Received non-200 while sending data to high-scale-ea. Status code: {response.status_code}. Retrying in {retry_interval} seconds...")
|
|
time.sleep(retry_interval)
|
|
except Exception as e:
|
|
self.logger.exception(f"An error occurred while sending data to high-scale-ea: {str(e)}. Retrying in {retry_interval} seconds...")
|
|
time.sleep(retry_interval)
|
|
|
|
raise Exception("All retry attempts failed. Could not send data to high-scale-ea.")
|