You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
113 lines
2.9 KiB
113 lines
2.9 KiB
# Copyright 2016 Splunk, Inc.
|
|
# SPDX-FileCopyrightText: 2020 2020
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
"""
|
|
This module contains simple interfaces for File compression and decompression.
|
|
"""
|
|
|
|
import gzip
|
|
import zipfile
|
|
|
|
from io import BytesIO
|
|
|
|
|
|
__all__ = ["GzipHandler", "ZipHandler"]
|
|
|
|
|
|
class GzipHandler:
|
|
"""
|
|
Class for handling gzip-formatted string content.
|
|
"""
|
|
|
|
@classmethod
|
|
def check_format(cls, data):
|
|
"""Validate `data` whether it is in gzip format.
|
|
|
|
Bytes 0 and 1 should be (per RFC 1952):
|
|
data[0] = 31 (0x1f), data[1] = 139 (0x8b).
|
|
|
|
:param data: Data to check.
|
|
:type data: ``bytes``
|
|
:returns: True if it is in gzip format else False.
|
|
:rtype: ``bool``
|
|
"""
|
|
|
|
return data[0:2] == b"\x1f\x8b"
|
|
|
|
@classmethod
|
|
def decompress(cls, data):
|
|
"""Decompress gzip-compressed data `data`.
|
|
|
|
It will perform basic validation, then return the decompressed
|
|
data or raises ValueError exception for invalid `data`.
|
|
|
|
:param data: Gzip-compressed data to decompress.
|
|
:type data: ``bytes``
|
|
:returns: decompressed data.
|
|
:rtype: ``string``
|
|
|
|
:raises ValueError: If `data` is not in gzip format
|
|
"""
|
|
|
|
if not cls.check_format(data):
|
|
raise ValueError("File is not gzip format.")
|
|
|
|
return gzip.GzipFile(fileobj=BytesIO(data), mode="rb").read()
|
|
|
|
|
|
class ZipHandler:
|
|
"""
|
|
Class for handling zip files.
|
|
"""
|
|
|
|
@classmethod
|
|
def check_format(cls, data):
|
|
"""Validate `data` whether it is in zip format.
|
|
|
|
:param data: Data to check.
|
|
:type data: ``bytes``
|
|
:returns: True if it is in zip format else False.
|
|
:rtype: ``bool``
|
|
"""
|
|
|
|
return zipfile.is_zipfile(BytesIO(data))
|
|
|
|
@classmethod
|
|
def decompress(cls, data):
|
|
"""Decompress zip-compressed data `data`.
|
|
|
|
It will perform basic validation, then return the decompressed
|
|
data or raises ValueError exception with error message.
|
|
|
|
:param data: Zip-compressed data to decompress.
|
|
:type data: ``bytes``
|
|
:returns: decompressed data.
|
|
:rtype: ``string``
|
|
|
|
:raises ValueError: If decompress data failed.
|
|
"""
|
|
|
|
if not cls.check_format(data):
|
|
raise ValueError("File is not zip format.")
|
|
|
|
fh = BytesIO(data)
|
|
decompressor = zipfile.ZipFile(fh)
|
|
|
|
files = decompressor.infolist()
|
|
if len(files) > 1:
|
|
raise ValueError(
|
|
"Zip files containing multiple files not supported by this " "handler."
|
|
)
|
|
|
|
try:
|
|
text = decompressor.read(files[0].filename)
|
|
except:
|
|
raise ValueError("Unknown exception when extracting zip file.")
|
|
|
|
if len(text) != files[0].file_size:
|
|
raise ValueError("Zip file size does not match actual size.")
|
|
|
|
return text
|