You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
129 lines
4.6 KiB
129 lines
4.6 KiB
"""
|
|
(C) 2019 Splunk Inc. All rights reserved.
|
|
"""
|
|
import sys
|
|
from cloudgateway import py23
|
|
import base64
|
|
import collections
|
|
|
|
"""Wrapper Credentials Bundle object to store a session token and username which are returned by the
|
|
server side on registration
|
|
"""
|
|
CredentialsBundle = collections.namedtuple('CredentialsBundle',
|
|
'session_token username deployment_name server_type token_type token_expires_at env_metadata')
|
|
CredentialsBundle.__new__.__defaults__ = (None,) * len(CredentialsBundle._fields)
|
|
|
|
EnvironmentMetadata = collections.namedtuple('EnvironmentMetadata', 'serialized_metadata id')
|
|
|
|
|
|
def make_device_id(encryption_context, sign_public_key):
|
|
return encryption_context.generichash_raw(sign_public_key)
|
|
|
|
|
|
class DeviceInfo(object):
|
|
"""
|
|
Helper class to encapsulate a client device's credentials that are returned by cloudgateway when we initiate
|
|
the registration process
|
|
"""
|
|
|
|
def __init__(self, encrypt_public_key, sign_public_key, device_id="", confirmation_code="", app_id="",
|
|
client_version="", app_name="", platform=""):
|
|
self.encrypt_public_key = encrypt_public_key # binary value
|
|
self.sign_public_key = sign_public_key # binary value
|
|
self.device_id = device_id # binary value
|
|
self.confirmation_code = confirmation_code # string
|
|
self.app_id = app_id # string
|
|
self.client_version = client_version # string
|
|
self.app_name = app_name
|
|
self.platform = platform
|
|
|
|
def __repr__(self):
|
|
return str(self.__dict__)
|
|
|
|
def to_json(self):
|
|
|
|
if sys.version_info < (3, 0):
|
|
encrypt_public_key = base64.b64encode(self.encrypt_public_key)
|
|
sign_public_key = base64.b64encode(self.sign_public_key)
|
|
device_id = base64.b64encode(self.device_id)
|
|
else:
|
|
encrypt_public_key = base64.b64encode(self.encrypt_public_key).decode('ascii')
|
|
sign_public_key = base64.b64encode(self.sign_public_key).decode('ascii')
|
|
device_id = base64.b64encode(self.device_id).decode('ascii')
|
|
|
|
return {
|
|
'encrypt_public_key': encrypt_public_key,
|
|
'sign_public_key': sign_public_key,
|
|
'device_id': device_id,
|
|
'conf_code': self.confirmation_code,
|
|
'app_id': self.app_id,
|
|
'client_version': self.client_version,
|
|
'app_name': self.app_name
|
|
|
|
}
|
|
|
|
@staticmethod
|
|
def from_json(jsn):
|
|
return DeviceInfo(
|
|
base64.b64decode(jsn['encrypt_public_key']),
|
|
base64.b64decode(jsn['sign_public_key']),
|
|
base64.b64decode(jsn['device_id']),
|
|
jsn['conf_code'],
|
|
jsn['app_id'],
|
|
jsn['client_version'],
|
|
)
|
|
|
|
|
|
class EncryptionKeys(object):
|
|
"""
|
|
Data class to encapsulate public and private keys needed to communicate with a device over cloud gatewaay
|
|
|
|
"""
|
|
|
|
def __init__(self, sign_public_key, sign_private_key, encrypt_public_key, encrypt_private_key):
|
|
"""
|
|
|
|
:param sign_public_key:
|
|
:param sign_private_key:
|
|
:param encrypt_public_key:
|
|
:param encrypt_private_key:
|
|
"""
|
|
self.sign_public_key = sign_public_key
|
|
self.sign_private_key = sign_private_key
|
|
self.encrypt_public_key = encrypt_public_key
|
|
self.encrypt_private_key = encrypt_private_key
|
|
|
|
def __repr__(self):
|
|
return str(self.__dict__)
|
|
|
|
def to_json(self):
|
|
if sys.version_info < (3, 0):
|
|
encrypt_public_key = base64.b64encode(self.encrypt_public_key)
|
|
encrypt_private_key = base64.b64encode(self.encrypt_private_key)
|
|
sign_public_key = base64.b64encode(self.sign_public_key)
|
|
sign_private_key = base64.b64encode(self.sign_private_key)
|
|
|
|
else:
|
|
encrypt_public_key = base64.b64encode(self.encrypt_public_key).decode('ascii')
|
|
encrypt_private_key = base64.b64encode(self.encrypt_private_key).decode('ascii')
|
|
sign_public_key = base64.b64encode(self.sign_public_key).decode('ascii')
|
|
sign_private_key = base64.b64encode(self.sign_private_key).decode('ascii')
|
|
|
|
return {
|
|
'encrypt_public_key': encrypt_public_key,
|
|
'encrypt_private_key': encrypt_private_key,
|
|
'sign_public_key': sign_public_key,
|
|
'sign_private_key': sign_private_key
|
|
|
|
}
|
|
|
|
@staticmethod
|
|
def from_json(jsn):
|
|
|
|
return EncryptionKeys(
|
|
base64.b64decode(jsn['sign_public_key']),
|
|
base64.b64decode(jsn['sign_private_key']),
|
|
base64.b64decode(jsn['encrypt_public_key']),
|
|
base64.b64decode(jsn['encrypt_private_key'])
|
|
)
|