You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

118 lines
5.7 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
import json
import requests
from requests.status_codes import codes
from .constants import NOTABLE_EVENT_ENDPOINT, DEFAULT_NOTABLE_EVENT_BATCH_SIZE, NEAPS_ENDPOINT, DEFAULT_MAX_RETRY, DEFAULT_RETRY_INTERVAL
from ITOA.itoa_common import get_itsi_event_management_conf_field_value
import gzip
import time
class IngestService:
"""Ingest Service.
"""
def __init__(self, logger, session_key, tenant_config, tenant_scope_scs_token):
"""
Args:
logger: logging instance
tenant_config: Instance of TenantConfig
tenant_scope_scs_token: tenant scope SCS token
"""
self.tenant_scope_scs_token = tenant_scope_scs_token
self.logger = logger
self.session_key = session_key
self.tenant_config = tenant_config
def get_endpoint(self, endpoint):
base_url = self.tenant_config.get_tenant_base_legacy_host_url()
tenant_name = self.tenant_config.get_tenant_name()
if not base_url:
self.logger.error(f"Error getting tenant_base_url to make SCS REST API. tenant={tenant_name}")
return
return 'https://' + base_url + '/' + tenant_name + endpoint
def send_events(self, events):
"""Push events to ingest service.
"""
create_notable_event_endpoint = self.get_endpoint(NOTABLE_EVENT_ENDPOINT)
if create_notable_event_endpoint is None:
return
tenant_name = self.tenant_config.get_tenant_name()
notable_events_batch_size = get_itsi_event_management_conf_field_value(self.session_key, 'ingest_service', 'notable_events_batch_size')
if notable_events_batch_size is None:
notable_events_batch_size = DEFAULT_NOTABLE_EVENT_BATCH_SIZE
try:
self.logger.info("Pushing events to High Scale EA for tenant: %s, total events=%s" % (tenant_name, len(events)))
for i in range(0, len(events), notable_events_batch_size):
batch_events = events[i:i + notable_events_batch_size]
self.logger.info("Pushing events to High Scale EA for tenant in bacthes : %s, total events=%s" % (tenant_name, len(batch_events)))
response, content = self._send_data(create_notable_event_endpoint, batch_events)
if response.status_code == codes.ok:
self.logger.info("Submitted events for High Scale EA. tenant=%s, result=%s, status=%s" % (tenant_name, content, response.status_code))
else:
self.logger.error("Error pushing event to Ingest Service. tenant=%s, status=%s, error=%s" % (tenant_name, response.status_code, content))
return False
except Exception as e:
self.logger.exception('Exception occurred while sending events to ingest service : %s' % e)
return False
return True
def send_neaps(self, policies):
"""Push NEAP's to ingest service.
"""
create_neaps_endpoint = self.get_endpoint(NEAPS_ENDPOINT)
if create_neaps_endpoint is None:
return
tenant_name = self.tenant_config.get_tenant_name()
try:
self.logger.info("Pushing NEAP's to High Scale EA for tenant : %s, total policies=%s" % (tenant_name, len(policies)))
response, content = self._send_data(create_neaps_endpoint, policies)
if response.status_code == codes.ok:
self.logger.info("Submitted NEAP's for High Scale EA. tenant=%s, result=%s, status=%s" % (tenant_name, content, response.status_code))
return True
else:
self.logger.error("Error pushing NEAP's to Ingest Service. tenant=%s, status=%s, error=%s" % (tenant_name, response.status_code, content))
return False
except Exception as e:
self.logger.exception("Exception occurred while sending NEAP's to ingest service : %s" % e)
return False
def _send_data(self, endpoint, payload):
"""Create data in ingest service.
"""
headers = {
"Authorization": f"Bearer {self.tenant_scope_scs_token}",
"Accept-Encoding": "gzip", "Content-Encoding": "gzip",
"Content-Type": "application/json"
}
compressed_payload = gzip.compress(bytes(json.dumps(payload), 'utf-8'))
# Define the maximum number of retry attempts
max_retries = get_itsi_event_management_conf_field_value(self.session_key, 'ingest_service', 'max_retries')
if max_retries is None:
max_retries = DEFAULT_MAX_RETRY
# Define the interval (in seconds) between retries
retry_interval = get_itsi_event_management_conf_field_value(self.session_key, 'ingest_service', 'retry_interval')
if retry_interval is None:
retry_interval = DEFAULT_RETRY_INTERVAL
for retry in range(max_retries):
try:
response = requests.post(
endpoint,
headers=headers,
data=compressed_payload
)
# Check the HTTP status code
if response.status_code == 200:
return response, response.content
else:
self.logger.error(f"Received non-200 while sending data to high-scale-ea. Status code: {response.status_code}. Retrying in {retry_interval} seconds...")
time.sleep(retry_interval)
except Exception as e:
self.logger.exception(f"An error occurred while sending data to high-scale-ea: {str(e)}. Retrying in {retry_interval} seconds...")
time.sleep(retry_interval)
raise Exception("All retry attempts failed. Could not send data to high-scale-ea.")