You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

294 lines
16 KiB

# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
# external pkgs that are shared
import json
from collections import namedtuple
import os
import requests
# internal pkgs
from integrations.commons.logutil import get_logger
from integrations.phantom.server import Phantom
from integrations.commons.splunk.server import Splunk
from integrations.phantom.api import Resource, Container, Artifact, PhantomAPIError
from itsi.event_management.sdk.custom_group_action_base import CustomGroupActionBase
from ITOA.event_management.notable_event_ticketing import ExternalTicket
from ITOA.event_management.notable_event_utils import Audit
from integrations.commons.itsi.utils import get_notable_events, get_group_time_range
# phantom TA addon dependencies
from phantom_config import PhantomConfig, PHANTOM_KEY, TOKEN_KEY, VERIFY_KEY
# phantom splunk add-on backward compatability fix
try:
# Newer versions (>2.7) of phantom splunk add-on has two "CERT_FILE_LOCATION" variables
from phantom_instance import CERT_FILE_LOCATION_DEFAULT, CERT_FILE_LOCATION_LOCAL
except ImportError:
# Older versions (< 2.7) of phantom splunk add-on has one "CERT_FILE_LOCATION" variable
from phantom_instance import CERT_FILE_LOCATION as CERT_FILE_LOCATION_LOCAL, CERT_FILE_LOCATION as CERT_FILE_LOCATION_DEFAULT
logger = get_logger(name=__name__)
'''
The following classes or global functions will have everything needed that connects splunk with the integration target.
This should be the only class that has connection logic. Everything else which is specific to phantom or itsi should go
into it's respective folders namely lib/integrations/phantom and lib/integrations/commons/itsi
'''
class SendToPhantomAction(CustomGroupActionBase):
def __init__(self, input_params):
super(SendToPhantomAction, self).__init__(input_params, logger)
self.raw_input_params = input_params
self.input_params = json.loads(input_params)
# Initialize Splunk Server and this will easy further communication
self.splunk_server = Splunk(self.get_session_key())
# Initialize auditor
self.auditor = self.get_auditor(self.splunk_server)
# Extract User Input request data and validate
self.request_data = self.get_config()
status = self.validate_userinputs(self.request_data)
if not status.isvalid:
self.auditor.send_activity_to_audit({'event_id': self.input_params.get('result')['itsi_group_id']},
'Failed: ' + ','.join(status.error_messages),
'Send To Splunk SOAR')
raise ValueError('Invalid User Input: ' + ','.join(status.error_messages))
# Initialize Phantom Server and this will easy further communication
config = PhantomConfig("itsi.event_action.send_to_phantom", self.splunk_server.session_key)
server_name = self.request_data.get('ph_server')
phantom_server_settings = None
for server_config in config[PHANTOM_KEY]:
if server_name == config[PHANTOM_KEY][server_config]['custom_name']:
phantom_server_settings = config[PHANTOM_KEY][server_config]
break
if not phantom_server_settings:
self.auditor.send_activity_to_audit({'event_id': self.input_params.get('result')['itsi_group_id']},
'Failed: Unable to find config with name matching the '
'chosen phantom server name "' + server_name + '"',
'Send To Splunk SOAR')
raise ValueError('Failed: Unable to find config with name matching the chosen phantom server name "' + server_name + '"')
phantom_host_name = phantom_server_settings['server']
phantom_auth_token = phantom_server_settings[TOKEN_KEY]
self.phantom_server = Phantom(phantom_auth_token, phantom_host_name)
if VERIFY_KEY in config:
Resource.verify_ssl_cert = config[VERIFY_KEY]
Resource.verify_ssl_cert = True if Resource.verify_ssl_cert == 1 else False
if Resource.verify_ssl_cert and os.path.isfile(CERT_FILE_LOCATION_LOCAL):
Resource.verify_ssl_cert = CERT_FILE_LOCATION_LOCAL
elif Resource.verify_ssl_cert and os.path.isfile(CERT_FILE_LOCATION_DEFAULT):
Resource.verify_ssl_cert = CERT_FILE_LOCATION_DEFAULT
def execute(self):
'''
In this function we create containers in phantom from the given episode information.
Then we iterate the notables inside the episode and create artifacts in phantom.
Then audit the happening
:return:
'''
label = self.request_data.get('ph_label')
audit_messages = []
audit_messages.append('Splunk SOAR Server: {}, Label:{}'.format(self.phantom_server.host_name, label))
logger.info('Splunk SOAR Container creation about to start')
itsi_group_id = self.input_params.get('result')['itsi_group_id']
container_data = {
'description': self.input_params.get('result').get('itsi_group_description', ''),
'name': self.input_params.get('result').get('itsi_group_title', ''),
'label': label,
'source_data_identifier': itsi_group_id
}
container_id = None
successes = 0
failures = 0
total = 0
already_existed = 0
try:
result = Container.create(server=self.phantom_server, data=container_data)
except requests.exceptions.SSLError as sslerror:
self.auditor.send_activity_to_audit({
'event_id': itsi_group_id
}, 'Container creation - Failed due to --> SSL Verification failed', 'Send To Splunk SOAR')
raise sslerror
if result.item:
logger.info('successfully created container with id ' + str(result.item.get('id')))
audit_messages.append('Container with { id: ' + str(result.item.get('id')) + ', sid: ' + str(itsi_group_id) + ' } - Successfull')
container_id = result.item.get('id')
total, successes, failures, already_existed = self.create_artifacts(container_id, label)
else:
if "duplicate with source_data_identifier" in str(result.errors.get('message')):
logger.info('Container with { id:'
+ str(result.errors['existing_container_id']) + ', sid:' + str(itsi_group_id) + ' } already exists')
audit_messages.append('Container with { id: ' + str(result.errors['existing_container_id']) + ', sid: '
+ str(itsi_group_id) + ' } - Already Exists')
container_id = result.errors['existing_container_id']
total, successes, failures, already_existed = self.create_artifacts(container_id, label)
else:
logger.info('Container creation - Failed due to --> ' + result.errors.get('message'))
self.auditor.send_activity_to_audit({
'event_id': itsi_group_id
}, 'Container creation - Failed due to --> ' + result.errors.get('message'), 'Send To Splunk SOAR')
raise PhantomAPIError('Container creation - Failed due to --> ' + result.errors.get('message'))
# Audit
artifact_stats_message = ' Artifact Creation Stats -> Total Notables:{total}, ' + \
' Artifacts Already Existed:{already_existed}, ' + \
' Artifacts Successfully created:{successes}, Artifact failed creations:{failures}'
audit_messages.append(artifact_stats_message.format(total=str(total),
already_existed=str(already_existed),
successes=str(successes),
failures=str(failures)))
self.auditor.send_activity_to_audit({'event_id': itsi_group_id}, ' | '.join(audit_messages), 'Send To Splunk SOAR')
logger.info(' | '.join(audit_messages))
def link_ticket(self, container_id):
ticket_system = 'Splunk SOAR Mission Control'
ticket_url = self.phantom_server.get_container_url(container_id)
ticket_id = str(container_id)
group_ids = []
for group in self.get_group():
group_id = self.extract_group_or_event_id(group)
group_ids.append(group_id)
if len(group_ids) == 1:
external_ticket = ExternalTicket(group_ids[0], self.get_session_key(), logger)
external_ticket.upsert(ticket_system, ticket_id, ticket_url)
elif len(group_ids) > 1:
ExternalTicket.bulk_upsert(group_ids, ticket_system, ticket_id, ticket_url,
self.get_session_key(), logger)
else:
logger.info("No associated events to upsert ticket.")
def create_artifacts(self, container_id, label):
service = self.splunk_server.get_service()
confs = service.confs
input_params_result = self.input_params.get('result')
get_notables_search_api_page_size = 50
try:
get_notables_search_api_page_size = int(confs['notable_event_actions']['itsi_event_action_send_to_phantom']
.content['splunk_itsi_get_notables_search_api_page_size'])
except Exception:
logger.error('Error - Getting splunk_itsi_get_notables_search_api_page_size configuration property. Setting default value - '
+ str(get_notables_search_api_page_size), exc_info=True)
artifacts_create_api_page_size = 50
try:
artifacts_create_api_page_size = int(confs['notable_event_actions']['itsi_event_action_send_to_phantom']
.content['phantom_artifacts_create_api_page_size'])
except Exception:
logger.error('Error - Getting phantom_artifacts_create_api_page_size configuration property. Setting default value - '
+ str(artifacts_create_api_page_size), exc_info=True)
# fix to include only first event in episode when sending to phantom.
send_first_event_only = 1
try:
send_first_event_only = int(confs['notable_event_actions']['itsi_event_action_send_to_phantom'].content['send_first_event_only'])
except Exception:
logger.error('Error - Getting send_first_event_only configuration property. Setting default value - '
+ str(send_first_event_only), exc_info=True)
if input_params_result.get('itsi_earliest_event_time'):
itsi_earliest_event_time = float(input_params_result.get('itsi_earliest_event_time'))
else:
logger.info("The event does not have itsi_earliest_event_time attribute, so we go ahead without it")
# done to support backward compatibility to events that dont have itsi_earliest_event_time attribute
itsi_earliest_event_time = None
time_range = get_group_time_range(input_params_result['start_time'],
input_params_result['last_time'],
input_params_result['itsi_first_event_time'],
input_params_result['itsi_last_event_time'],
itsi_earliest_event_time)
logger.info('send_first_event_only = ' + str(send_first_event_only) + ' , time_range = ' + str(time_range))
notable_events = get_notable_events(splunk_server=self.splunk_server,
itsi_group_id=input_params_result['itsi_group_id'],
earliest_time=time_range['earliest_time'],
latest_time=time_range['latest_time'],
page_size=int(get_notables_search_api_page_size),
results_max_limit=1 if send_first_event_only == 1 else None)
if not len(list(notable_events)):
group_id = input_params_result['itsi_group_id']
raise PhantomAPIError(f'Artifact creation failed because episode is empty. Group_id={group_id}')
successes = 0
failures = 0
total = 0
already_existed = 0
for notable_events_subset in notable_events:
total = total + len(notable_events_subset)
offset = 0
count = int(artifacts_create_api_page_size)
page = 0
for i in range(offset, len(notable_events_subset), count):
page = page + 1
logger.info('Splunk SOAR page ' + str(page))
successes_local, failures_local, already_existed_local = self.convert_notables_to_artifacts(
container_id,
notable_events_subset[i:i + count],
label)
successes = successes + successes_local
failures = failures + failures_local
already_existed = already_existed + already_existed_local
self.link_ticket(container_id)
return total, successes, failures, already_existed
def convert_notables_to_artifacts(self, container_id, notable_events, label):
successes = 0
failures = 0
already_existed = 0
artifacts = []
for notable_event in notable_events:
notable_event['_serial'] = 'normalized'
artifact_data = {
'name': notable_event['title'],
'label': label,
'source_data_identifier': notable_event['event_id'],
'container_id': container_id,
'data': notable_event,
'cef': notable_event
}
artifacts.append(artifact_data)
try:
result = Artifact.create(server=self.phantom_server, data=artifacts)
except requests.exceptions.SSLError as sslerror:
self.auditor.send_activity_to_audit({
'event_id': self.input_params.get('result')['itsi_group_id']
}, 'Artifact creation - Failed due to --> SSL Verification failed', 'Send To Splunk SOAR')
raise sslerror
for item in result.item:
if item.get('success'):
successes = successes + 1
logger.info('successfully created artifact with id ' + str(item.get('id')))
else:
if "artifact already exists" in str(item.get('message')):
already_existed = already_existed + 1
logger.info('artifact with { id:' + str(item['existing_artifact_id']) + '} already exists')
else:
failures = failures + 1
logger.error('failed creating artifact because of ' + item.get('message'))
return successes, failures, already_existed
def get_auditor(self, splunk_server):
return Audit(session_key=splunk_server.session_key, audit_token_name='Auto Generated ITSI Notable Index Audit Token')
# returns a tuple with (status, validationmessages) after validating inputs
def validate_userinputs(self, input_data):
Status = namedtuple('Status', ['isvalid', 'error_messages'])
validation_messages = []
if not input_data.get('ph_server'):
validation_messages.append('Splunk SOAR Server field is invalid')
# label field defaults to events so no need to check
if not validation_messages:
return Status(True, [])
else:
return Status(False, validation_messages)