You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

129 lines
4.6 KiB

"""
(C) 2019 Splunk Inc. All rights reserved.
"""
import sys
from cloudgateway import py23
import base64
import collections
"""Wrapper Credentials Bundle object to store a session token and username which are returned by the
server side on registration
"""
CredentialsBundle = collections.namedtuple('CredentialsBundle',
'session_token username deployment_name server_type token_type token_expires_at env_metadata')
CredentialsBundle.__new__.__defaults__ = (None,) * len(CredentialsBundle._fields)
EnvironmentMetadata = collections.namedtuple('EnvironmentMetadata', 'serialized_metadata id')
def make_device_id(encryption_context, sign_public_key):
return encryption_context.generichash_raw(sign_public_key)
class DeviceInfo(object):
"""
Helper class to encapsulate a client device's credentials that are returned by cloudgateway when we initiate
the registration process
"""
def __init__(self, encrypt_public_key, sign_public_key, device_id="", confirmation_code="", app_id="",
client_version="", app_name="", platform=""):
self.encrypt_public_key = encrypt_public_key # binary value
self.sign_public_key = sign_public_key # binary value
self.device_id = device_id # binary value
self.confirmation_code = confirmation_code # string
self.app_id = app_id # string
self.client_version = client_version # string
self.app_name = app_name
self.platform = platform
def __repr__(self):
return str(self.__dict__)
def to_json(self):
if sys.version_info < (3, 0):
encrypt_public_key = base64.b64encode(self.encrypt_public_key)
sign_public_key = base64.b64encode(self.sign_public_key)
device_id = base64.b64encode(self.device_id)
else:
encrypt_public_key = base64.b64encode(self.encrypt_public_key).decode('ascii')
sign_public_key = base64.b64encode(self.sign_public_key).decode('ascii')
device_id = base64.b64encode(self.device_id).decode('ascii')
return {
'encrypt_public_key': encrypt_public_key,
'sign_public_key': sign_public_key,
'device_id': device_id,
'conf_code': self.confirmation_code,
'app_id': self.app_id,
'client_version': self.client_version,
'app_name': self.app_name
}
@staticmethod
def from_json(jsn):
return DeviceInfo(
base64.b64decode(jsn['encrypt_public_key']),
base64.b64decode(jsn['sign_public_key']),
base64.b64decode(jsn['device_id']),
jsn['conf_code'],
jsn['app_id'],
jsn['client_version'],
)
class EncryptionKeys(object):
"""
Data class to encapsulate public and private keys needed to communicate with a device over cloud gatewaay
"""
def __init__(self, sign_public_key, sign_private_key, encrypt_public_key, encrypt_private_key):
"""
:param sign_public_key:
:param sign_private_key:
:param encrypt_public_key:
:param encrypt_private_key:
"""
self.sign_public_key = sign_public_key
self.sign_private_key = sign_private_key
self.encrypt_public_key = encrypt_public_key
self.encrypt_private_key = encrypt_private_key
def __repr__(self):
return str(self.__dict__)
def to_json(self):
if sys.version_info < (3, 0):
encrypt_public_key = base64.b64encode(self.encrypt_public_key)
encrypt_private_key = base64.b64encode(self.encrypt_private_key)
sign_public_key = base64.b64encode(self.sign_public_key)
sign_private_key = base64.b64encode(self.sign_private_key)
else:
encrypt_public_key = base64.b64encode(self.encrypt_public_key).decode('ascii')
encrypt_private_key = base64.b64encode(self.encrypt_private_key).decode('ascii')
sign_public_key = base64.b64encode(self.sign_public_key).decode('ascii')
sign_private_key = base64.b64encode(self.sign_private_key).decode('ascii')
return {
'encrypt_public_key': encrypt_public_key,
'encrypt_private_key': encrypt_private_key,
'sign_public_key': sign_public_key,
'sign_private_key': sign_private_key
}
@staticmethod
def from_json(jsn):
return EncryptionKeys(
base64.b64decode(jsn['sign_public_key']),
base64.b64decode(jsn['sign_private_key']),
base64.b64decode(jsn['encrypt_public_key']),
base64.b64decode(jsn['encrypt_private_key'])
)