You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

208 lines
9.1 KiB

import os
import pickle
import time
import numpy as np
import json
import pandas as pds
import recommendation_task.recommendation_consts as const
import ml_utils
from splunklib.six.moves import range
logger = ml_utils.get_logger()
MODEL_PATH = os.path.join(const.APP_BIN_PATH, 'machine_learning_mod', 'model')
CLOUDWATCH_CNT_NAME = ['CPUUtilization_count', 'NetworkIn_count', 'NetworkOut_count', 'DiskReadBytes_count',
'DiskWriteBytes_count']
CLOUDWATCH_METRICS = 'metric_name'
CLOUDWATCH_COUNT = 'count'
DAY_SECONDS = 86400
METRIC_CNT = 5
CNT_THRESHOLD = 1. / 24
def save_model(model):
with open(MODEL_PATH, 'w') as file:
pickle.dump(model, file)
def load_model():
try:
with open(MODEL_PATH, 'r') as file:
return pickle.load(file)
except:
return None
# parse str value
def parse_str_value(str, default_value, min_value, max_value):
try:
return min(max(float(str), min_value), max_value)
except:
return default_value
# get accepted feedbacks of current dimension
def get_accepted_resources(feedback_list):
accepted_resource_list = []
last_day_timestamp = int(time.time()) // DAY_SECONDS * DAY_SECONDS
for feedback_data in feedback_list:
# 1. feedback is 'action'; 2. dimension is EC2 upgrade/downgrade; 3. time is last day
if feedback_data['feedback'] == 'action' and feedback_data['ml_dimension'] == const.EC2_DYNAMIC_UP_DOWN and \
feedback_data['timestamp'] >= last_day_timestamp:
accepted_resource_list.append(feedback_data['resource_id'])
logger.info('Instances (%s) has just been accepted and will not be recommended later.' %(json.dumps(accepted_resource_list)))
return accepted_resource_list
# transform cloudwatch list format, to fit machine learning algorithm
def format_cloudwatch_list(cloudwatch_list):
instance_cloudwatch_map = {}
# for i in range(len(cloudwatch_list)):
# cloudwatch_data = cloudwatch_list.loc[i]
for cloudwatch_data in cloudwatch_list:
resource_id = cloudwatch_data[const.INPUT_INSTANCE]
time = cloudwatch_data[const.INPUT_TIME]
key = '%s#%d' % (resource_id, time)
if key not in list(instance_cloudwatch_map.keys()):
instance_cloudwatch_map[key] = {}
metric_name = cloudwatch_data[CLOUDWATCH_METRICS]
avg_value_name = metric_name + '_avg_value'
max_value_name = metric_name + '_max_value'
metric_count_name = metric_name + '_count'
instance_cloudwatch_map[key][avg_value_name] = cloudwatch_data[avg_value_name]
instance_cloudwatch_map[key][max_value_name] = cloudwatch_data[max_value_name]
instance_cloudwatch_map[key][metric_count_name] = cloudwatch_data[CLOUDWATCH_COUNT]
formatted_cloudwatch_list = []
for key in instance_cloudwatch_map:
(resource_id, time) = key.split('#')
cloudwatch_array = [resource_id]
for feature_name in const.CLOUDWATCH_METRICS_NAMES:
if feature_name in instance_cloudwatch_map[key]:
cloudwatch_array.append(instance_cloudwatch_map[key][feature_name])
else:
cloudwatch_array.append(0)
cloudwatch_array.append(int(time))
formatted_cloudwatch_list.append(cloudwatch_array)
return formatted_cloudwatch_list
# transform json list format, to fit machine learning algorithm
def format_json_list(list, metric_names):
formatted_list = []
for data in list:
array = []
for metric_name in metric_names:
if metric_name in data:
array.append(data[metric_name])
else:
if metric_name == const.INPUT_FEATURE:
array.append([np.nan])
else:
array.append(np.nan)
formatted_list.append(array)
return formatted_list
def cloudwatch_clean(cloudwatch, minimum_sample_days):
column_names = const.CLOUDWATCH_METRICS_NAMES[:]
column_names.append(const.INPUT_TIME)
column_names.insert(0, const.INPUT_INSTANCE)
cloudwatch = pds.DataFrame(cloudwatch, columns=column_names)
cloudwatch = cloudwatch.sort([const.INPUT_TIME], ascending=False)
max_time = cloudwatch[const.INPUT_TIME].max() // DAY_SECONDS
cnt_threshold = 0
if len(cloudwatch) != 0:
cnt_threshold = int(cloudwatch[CLOUDWATCH_CNT_NAME].values.max() * CNT_THRESHOLD)
instances = cloudwatch[const.INPUT_INSTANCE].drop_duplicates()
valid_instances = []
features = []
for ins in instances:
cur_ins_feature = cloudwatch[cloudwatch[const.INPUT_INSTANCE] == ins][const.CLOUDWATCH_METRICS_NAMES]
feature_indexes = max_time - cloudwatch[cloudwatch[const.INPUT_INSTANCE] == ins][
const.INPUT_TIME].values // DAY_SECONDS
if len(feature_indexes) == 0:
continue
result = list(range(METRIC_CNT * 2 * const.CLOUDWATCH_DAYS))
no_enough = False # not enough to construct a vaild feature row
for i in range(METRIC_CNT):
max_values = cur_ins_feature[const.CLOUDWATCH_METRICS_NAMES[i * 3]].values
avg_values = cur_ins_feature[const.CLOUDWATCH_METRICS_NAMES[i * 3 + 1]].values
metric_cnt = cur_ins_feature[const.CLOUDWATCH_METRICS_NAMES[i * 3 + 2]].values
no_enough_index = np.where(metric_cnt < cnt_threshold)[0]
if len(max_values) - len(no_enough_index) <= minimum_sample_days:
no_enough = True
break
else:
result[i * const.CLOUDWATCH_DAYS:(i + 1) * const.CLOUDWATCH_DAYS] = [np.max(
max_values)] * const.CLOUDWATCH_DAYS
result[(i + 1) * const.CLOUDWATCH_DAYS:(i + 2) * const.CLOUDWATCH_DAYS] = [np.mean(
avg_values)] * const.CLOUDWATCH_DAYS
start_index = 0
for j in range(len(feature_indexes)):
if start_index >= len(no_enough_index) or j != no_enough_index[start_index]:
result[i * const.CLOUDWATCH_DAYS + feature_indexes[j]] = max_values[j]
result[(i + 1) * const.CLOUDWATCH_DAYS + feature_indexes[j]] = avg_values[j]
else:
start_index += 1
if not no_enough:
features.append(result)
valid_instances.append(ins)
logger.info("Clean cloudwatch data: initial_instances_set_len=%d,valid_instances_set_len=%d." % (len(instances), len(valid_instances)))
return valid_instances, features
# dedup feedback based on resource_id (instance_id) and timestamp
def dedup_feedback(feedback):
feedback = feedback.sort(columns=[const.INPUT_TIME], ascending=[1]) # sort feedback in ascending timestamp
deleted_row = []
instances_id_set = set([])
for i in range(len(feedback)):
instance_id = feedback.loc[i, const.INPUT_INSTANCE]
if instance_id in instances_id_set:
deleted_row.append(i)
else:
instances_id_set.add(instance_id)
return feedback.drop(feedback.index[deleted_row]).reset_index(drop=True)
# use feedback and yesterday's recommendation results to form updated dataset
def update_dataset(feedback, recommendation):
feedback = pds.DataFrame(feedback, columns=const.FEEDBACK_METRICS_NAMES)
feedback = dedup_feedback(feedback)
recommendation = pds.DataFrame(recommendation, columns=const.RECOMMENDATION_METRICS_NAMES)
recommendation_with_no_feedback = [recommendation[const.RECOMMENDATION_RECOMMENDATION_ID][i] not in
feedback[const.FEEDBACK_RECOMMENDATION_ID].values for i in
range(len(recommendation))]
recommendation = recommendation[recommendation_with_no_feedback]
for i in range(len(feedback)):
if feedback.loc[i, const.RECOMMENDATION_ACTION] == const.DOWNGRADE_ACTION:
feedback.loc[i, const.RECOMMENDATION_SCORE] *= -1
if feedback.loc[i, const.FEEDBACK_ACTION] == const.IGNORE_ACTION:
feedback.loc[i, const.RECOMMENDATION_SCORE] *= const.IGNORE_COEFFS
elif feedback.loc[i, const.FEEDBACK_ACTION] == const.READ_ACTION:
feedback.loc[i, const.RECOMMENDATION_SCORE] *= const.READ_COEFFS
else:
feedback.loc[i, const.RECOMMENDATION_SCORE] *= const.ACTION_COEFFS
x = np.array(feedback[const.INPUT_FEATURE].values.tolist(), dtype=float)
y = np.array(feedback[const.RECOMMENDATION_SCORE].values.tolist(), dtype=float)
if len(recommendation) != 0:
recommendation_y = np.array(recommendation[const.RECOMMENDATION_SCORE].values.tolist(), dtype=float)
recommendation_x = np.array(recommendation[const.INPUT_FEATURE].values.tolist(), dtype=float)
nega_index = np.where(recommendation[const.RECOMMENDATION_ACTION] == const.DOWNGRADE_ACTION)[0]
recommendation_y[nega_index] *= -1
y = np.append(y, recommendation_y, axis=0)
x = np.append(x, recommendation_x, axis=0)
y_index = set(np.where(np.isnan(y))[0].tolist())
x_index = set([i for i in range(len(x)) if np.isnan(x[i]).sum() >= 1])
index = list(x_index | y_index)
return np.delete(x, index, axis=0), np.delete(y, index, axis=0)