You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
237 lines
6.3 KiB
237 lines
6.3 KiB
# Copyright (C) 2005-2025 Splunk Inc. All Rights Reserved.
|
|
import os
|
|
import errno
|
|
import zipfile
|
|
import glob
|
|
import shutil
|
|
import json
|
|
|
|
from ITOA.setup_logging import logger
|
|
|
|
|
|
class FileManager(object):
|
|
"""
|
|
Manager file operation
|
|
"""
|
|
DELIMITER = "___"
|
|
|
|
@staticmethod
|
|
def delete_file(path):
|
|
"""
|
|
Deletes the file at the path provided
|
|
:param path: path to delete file at
|
|
:return:
|
|
"""
|
|
try:
|
|
os.remove(path)
|
|
except OSError as e:
|
|
logger.warning(e)
|
|
|
|
@staticmethod
|
|
def is_file(path):
|
|
"""
|
|
Check if it is file or not
|
|
|
|
@type path: basestring
|
|
@param path: directory path
|
|
|
|
@rtype: bool
|
|
@return: True or False
|
|
"""
|
|
return os.path.isfile(path)
|
|
|
|
@staticmethod
|
|
def is_directory(path):
|
|
"""
|
|
Check if it is directory
|
|
|
|
@type path: basestring
|
|
@param path: directory path
|
|
|
|
@rtype: bool
|
|
@return: True or False
|
|
"""
|
|
return os.path.isdir(path)
|
|
|
|
@staticmethod
|
|
def is_exists(path):
|
|
return os.path.exists(path)
|
|
|
|
@staticmethod
|
|
def get_base_dir(file):
|
|
"""
|
|
Get base dir of given file. If directory is passed then return dir
|
|
|
|
@type file: basestring
|
|
@param file: file path
|
|
|
|
@return: Base directory - if file path is passed
|
|
@rtype: basestring
|
|
"""
|
|
if os.path.isfile(file):
|
|
return os.path.dirname(file)
|
|
elif os.path.isdir(file):
|
|
return file
|
|
elif file is None:
|
|
return os.getcwd()
|
|
|
|
@staticmethod
|
|
def create_directory(path):
|
|
"""
|
|
Create directory
|
|
|
|
@type path: basestring
|
|
@param path: directory path
|
|
@return:
|
|
"""
|
|
try:
|
|
os.makedirs(path)
|
|
logger.debug("Successfully create directory, path=%s", path)
|
|
except OSError as e:
|
|
if e.errno != errno.EEXIST:
|
|
logger.exception(e)
|
|
raise e
|
|
|
|
@staticmethod
|
|
def zip_directory(root_path, name_of_zip_file):
|
|
"""
|
|
Zip the directory
|
|
|
|
@type path: basestring
|
|
@param path: directory path
|
|
"""
|
|
try:
|
|
os.chdir(os.path.dirname(root_path))
|
|
with zipfile.ZipFile(name_of_zip_file + '.zip',
|
|
"w",
|
|
zipfile.ZIP_DEFLATED,
|
|
allowZip64=True) as zf:
|
|
for root, _, filenames in os.walk(os.path.basename(root_path)):
|
|
for name in filenames:
|
|
name = os.path.join(root, name)
|
|
name = os.path.normpath(name)
|
|
zf.write(name, name)
|
|
except Exception as exc:
|
|
logger.exception(exc)
|
|
raise
|
|
|
|
@staticmethod
|
|
def unzip_backup(path_to_zip_file, extract_to_path):
|
|
"""
|
|
Unzip the backup zip file and rename the extracted folder to the parent folder name in extract_to_path
|
|
|
|
@type path_to_zip_file: basestring
|
|
@param path_to_zip_file: path to zip file including .zip extension
|
|
|
|
@type extract_to_path: basestring
|
|
@param extract_to_path: path to extract to
|
|
"""
|
|
zip_ref = zipfile.ZipFile(path_to_zip_file, 'r')
|
|
zip_ref.extractall(extract_to_path)
|
|
zip_ref.close()
|
|
|
|
@staticmethod
|
|
def delete_working_directory(path):
|
|
"""
|
|
Delete the working directory that contains the json files
|
|
|
|
@type path: basestring
|
|
@param path: directory path
|
|
"""
|
|
try:
|
|
shutil.rmtree(path)
|
|
except OSError as ose:
|
|
logger.exception(ose)
|
|
raise
|
|
|
|
@staticmethod
|
|
def write_to_file(file_path, data, flag='w+'):
|
|
"""
|
|
Write a valid json convert-able data to the file_path
|
|
|
|
@type file_path: basestring
|
|
@param file_path: file_path path
|
|
|
|
@type data: dict
|
|
@param data: json data to write
|
|
|
|
@type flag: basestring
|
|
@param flag: file_path opening flags
|
|
|
|
@return:
|
|
"""
|
|
with open(file_path, flag) as fp:
|
|
fp.writelines(json.dumps(data))
|
|
|
|
@staticmethod
|
|
def read_data(file_path, flag='r'):
|
|
"""
|
|
Read data from given file_path and return json object
|
|
|
|
@type file_path: basestring
|
|
@param file_path: file_path path
|
|
|
|
@type flag: basestring
|
|
@param flag: file_path opening flags
|
|
|
|
@rtype: json dict
|
|
@return: json based dict
|
|
"""
|
|
with open(file_path, flag) as fp:
|
|
data = json.load(fp)
|
|
|
|
return data
|
|
|
|
@staticmethod
|
|
def clean_file(file_path):
|
|
"""
|
|
Delete content of the file
|
|
|
|
@type file_path: basestring
|
|
@param file_path: file path
|
|
|
|
@return:
|
|
"""
|
|
if os.path.exists(file_path):
|
|
try:
|
|
open(file_path, "w").close()
|
|
except Exception as exc:
|
|
logger.error(exc.args[0])
|
|
logger.info("Failed to clean existing file. Will append data to existing file.")
|
|
|
|
@staticmethod
|
|
def get_rolling_file_name(file_path, rolling_file_number=0):
|
|
"""
|
|
Get rolling file name
|
|
for example: ('/tmp/foo.txt,2) would return /tmp/foo___2.txt where ____ is the DELIMITER string
|
|
|
|
@type file_path: basestring
|
|
@param file_path: file path
|
|
|
|
@type rolling_file_number: integer
|
|
@param rolling_file_number: rolling file number
|
|
|
|
@rtype: basestring
|
|
@return: file name
|
|
"""
|
|
basedir = os.path.dirname(file_path)
|
|
basefilename = os.path.basename(file_path)
|
|
tmp_file = (basefilename[0:basefilename.rfind(".")] + FileManager.DELIMITER
|
|
+ str(rolling_file_number) + basefilename[basefilename.rfind("."): len(basefilename)])
|
|
return os.path.join(basedir, tmp_file)
|
|
|
|
@staticmethod
|
|
def get_zip_file_names(directory_path):
|
|
"""
|
|
Get filenames with a .zip extension in the directory provided by directory_path
|
|
|
|
@:rtype: list
|
|
@return: list of filenames
|
|
|
|
"""
|
|
file_paths = glob.glob(os.path.join(directory_path, '*.zip'))
|
|
if isinstance(file_paths, list) and len(file_paths) > 0:
|
|
return [fpath.split(os.sep)[-1].split('.zip')[-2] for fpath in file_paths]
|
|
else:
|
|
return None
|