from datetime import datetime import numbers import numpy as np import pandas as pd from scipy.special import expit from scipy.stats import iqr from statsmodels.nonparametric.smoothers_lowess import lowess from util.dev_util import compare_versions from io import open from util.setup_logging import get_logger from six.moves import range logger = get_logger(__name__) pd.options.mode.chained_assignment = 'raise' EPSILON = 1e-7 DEFAULT_Z = 3.3 DEFAULT_SENSITIVITY_MULTIPLIER = 3.0 DEFAULT_IQR_MULTIPLIER = 1.5 DEFAULT_QUANTILE = [99.5, 0.5] THRESH_SUPER_SPIKE = 3.0 THRESH_SUPER_SPIKE_REGULARITY = 7 ALGO_STD = 'stdev' ALGO_QUANTILE = 'quantile' ALGO_RANGE = 'range' ALGO_PERCENT = 'percentage' ALGO_IQR = 'iqr' COL_TIMESTAMP = 'timestamp' COL_KPI_ID = 'kpi_id' COL_SERVICE_ID = 'service_id' COL_DATE = 'date' COL_VALUE = 'value' COL_HOUR = 'HourOfDay' COL_DAY_OF_WEEK = 'DayOfWeek' DAY_OF_WEEK_NAME = {0: 'Mon', 1: 'Tue', 2: 'Wed', 3: 'Thu', 4: 'Fri', 5: 'Sat', 6: 'Sun'} NAB_TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S' ITSI_TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%S.000%z' ITSI_TIMESTAMP_FORMAT_PD = '%Y-%m-%d %H:%M:%S%z' COL_BND_LOW = 'bnd_low' COL_BND_UP = 'bnd_up' COL_ITSI_BND_LOW = 'itsi_bnd_low' COL_ITSI_BND_UP = 'itsi_bnd_up' COL_ANOMALY_LABEL = 'anomaly_label' # binary 0-1 label COL_ANOMALY_SCORE = 'anomaly_score' # anomaly score express as the distance between a data point to the anomaly boundary COL_EDGE_MASK = 'edge_mask' # binary 0-1 mask to mitigate false alarms at the edge of segments class NotEnoughDataException(Exception): pass def sigmoid(x, trans=1): """ Use the sigmoid function to transfer x to [0,1] Use the default value 1 for trans to make sure x=1 map to 0.5 """ # Use the expit function from scipy, b/c it is more robust and efficient: # https://stackoverflow.com/questions/21106134/numpy-pure-functions-for-performance-caching return expit(x - trans) SPL_DFFAULT_TIME_COL = '_time' SPL_VAL_COL = {'count', 'value', 'alert_value'} def is_time_column(df, i): if df.columns[i] == SPL_DFFAULT_TIME_COL: return True sample = df[df.columns[i]].iloc[0] try: _ = pd.to_datetime(sample) return True except: return False def is_val_column(df, i): if df.columns[i] in SPL_VAL_COL: return True sample = df[df.columns[i]].iloc[0] if isinstance(sample, numbers.Number): return True return False def idenfity_time_and_value_columns(df): if len(df) < 1: raise RuntimeError('the dataframe has less than 1 rows') cnt_columns = len(df.columns) if cnt_columns < 2: raise RuntimeError('the dataframe has less than 2 columns') if cnt_columns == 2: if df.columns[1].find('time') >= 0: # handle the case of some CSV files have time and value columns in opposite order df.columns = [COL_VALUE, COL_TIMESTAMP] else: df.columns = [COL_TIMESTAMP, COL_VALUE] return df time_col_idx = -1 val_col_idx = -1 cnt_columns = len(df.columns) for i in range(cnt_columns): if time_col_idx < 0: if is_time_column(df, i): time_col_idx = i if val_col_idx < 0: if is_val_column(df, i): val_col_idx = i if time_col_idx >= 0 and val_col_idx >= 0: break if time_col_idx >= 0 and val_col_idx >= 0: time_col = df.columns[time_col_idx] val_col = df.columns[val_col_idx] df = df[[time_col, val_col]] df.columns = [COL_TIMESTAMP, COL_VALUE] return df raise RuntimeError('cannot identify time and value columns in the dataframe') def is_unix_epoch_timestamp(timestamp): try: # Check if it can be converted to a number float(timestamp) # Additional check for reasonable range # (assuming timestamps between 1970 and 2100) return 0 < float(timestamp) < 4102444800 except (TypeError, ValueError): return False def is_mixed_timezones(timestamps): """ Ref of "Timezones and time offsets" : https://pandas.pydata.org/docs/reference/api/pandas.to_datetime.html#to-datetime-tz-examples """ if is_unix_epoch_timestamp(timestamps.iloc[0]): return False datetimes = pd.to_datetime(timestamps, infer_datetime_format=True) # In the case of mixed timezones, the pd.to_datetime(..) function will default to returning generic object types return datetimes.dtype == np.dtype(object) def aug_datetime(df, unit="ms", drop_raw_timestamp = True): is_mix_tz = is_mixed_timezones(df[COL_TIMESTAMP]) try: df[COL_DATE] = pd.to_datetime(df[COL_TIMESTAMP], unit=unit, utc=is_mix_tz) except: df[COL_DATE] = pd.to_datetime(df[COL_TIMESTAMP], infer_datetime_format=True, utc=is_mix_tz) if drop_raw_timestamp: df.drop(COL_TIMESTAMP, axis=1, inplace=True) df[COL_DAY_OF_WEEK] = df[COL_DATE].dt.dayofweek df[COL_HOUR] = df[COL_DATE].dt.hour df.dropna(inplace=True) # TODO: try to remove the dropna df[COL_DAY_OF_WEEK] = df[COL_DAY_OF_WEEK].astype("uint8") df[COL_HOUR] = df[COL_HOUR].astype("uint8") def load_time_series_from_file(filename, filterCol=None): with open(filename) as f: df = pd.read_csv(f) if filterCol is not None and len(filterCol)==2: col_name = filterCol[0] col_value = filterCol[1] df = df[df[col_name] == col_value] df = idenfity_time_and_value_columns(df) aug_datetime(df) df = df.set_index(COL_DATE) df = df.sort_index() return df def get_resolution(df): ts_diff = df.index[:min(df.shape[0], 1000)].to_series().diff() frequent_timestamp_delta_sorted = ts_diff.value_counts() if frequent_timestamp_delta_sorted.index[0] == pd.Timedelta(seconds=0): assert len(frequent_timestamp_delta_sorted) > 1 return frequent_timestamp_delta_sorted.index[1] else: return frequent_timestamp_delta_sorted.index[0] def down_sample(df, resolution='15min', df_resolution=None): # resolution is 'rule' supported by https://pandas.pydata.org/docs/reference/api/pandas.Series.resample.html # examples: 3min or 3T : for resolution of 3 minutes # 30S : for resolution of 30 seconds if not df_resolution: df_resolution = get_resolution(df) if df_resolution > (pd.Timedelta(resolution) if isinstance(resolution, str) else resolution): logger.warning(f'The downsampling target resolution of {resolution} is finer than the resolution of the input time series of {df_resolution}; no change is made to the resolution of the input time series.') return df # replace the missing value with forward filling # also considerred median filling and filling with 0 # choose forward filling because forward filling require # less domain knowledge and less assumption df_resampled = df.resample(resolution).mean() df_resampled[COL_VALUE].fillna(method="ffill", inplace=True) df_resampled[COL_DAY_OF_WEEK].fillna(method="ffill", inplace=True) df_resampled[COL_HOUR].fillna(method="ffill", inplace=True) df[COL_DAY_OF_WEEK] = df[COL_DAY_OF_WEEK].astype("uint8") df[COL_HOUR] = df[COL_HOUR].astype("uint8") return df_resampled # remove trend on the basis of day median, so that # trend from day to day are removed, but differences among hours in a day is kept def detrend_daily(df, offset=0): rslt = df.copy() overall_med = df[COL_VALUE].median() # use median to make it robust to extreme values dfre = resample_with_offset(df, '24h', offset) for _, sub_df in dfre: med = sub_df[COL_VALUE].median() rslt.loc[sub_df.index, COL_VALUE] = sub_df[COL_VALUE] - med + overall_med return rslt def resample_with_offset(df, resolution, offset_hours): pandas_version = pd.__version__ comparison = compare_versions(pandas_version, '1.1.0') if comparison < 0: # This means we are using pandas < 1.1.0 dfre = df.resample(resolution, base=offset_hours) else: # This means we are using pandas >= 1.1.0 dfre = df.resample(resolution, offset=f'{offset_hours}h') return dfre # convert anomaly labels represented as strings to datetime def anomaly_labels_to_datetime(anomaly_labels, timestamp_format): anomaly_datetime = [] for anomaly in anomaly_labels: if isinstance(anomaly, tuple): # anomaly region anomaly_datetime.append([datetime.strptime(endpoint, timestamp_format) for endpoint in anomaly]) else: anomaly_datetime.append(datetime.strptime(anomaly, timestamp_format)) return anomaly_datetime def smooth(xs, frac): filtered = lowess(xs, range(len(xs)), is_sorted=True, frac=frac, it=0) return [x[1] for x in filtered] def remove_super_spikes(df): """ As a pre-processing, remove super high spikes, so that the calculation of thresholds are not distorted by them """ df_resolution = get_resolution(df) win_length = int(pd.Timedelta(days=1) / df_resolution) # one-day long window half_day = pd.Timedelta(hours=12) thresh_regularity = max( min( THRESH_SUPER_SPIKE_REGULARITY, int((df.index[-1] - df.index[0]) / pd.Timedelta(days=1)) - 1 ), 4 ) values = df[COL_VALUE].to_numpy() mid = np.median(values) variation_unit = iqr(values) if variation_unit == 0 : variation_unit = values.std() + 1e-6 deviation_normalized = np.abs(df[COL_VALUE] - mid) / variation_unit frac = 10 / len(deviation_normalized) smoothed = pd.Series(smooth(deviation_normalized.values, frac), index=deviation_normalized.index) deviation_normalized.update(smoothed) def _count_high_spikes_same_hour_other_day(idx_in, similar_spikes): cnt = 0 # Here the type of the difference between two timestamp index is pandas.Timedelta. # There is no well-defined 'abs' and the negative case is also tricky: # https://stackoverflow.com/questions/31836788/subtracting-pandas-timestamps-absolute-value other_spikes = similar_spikes.loc[ # select rows at least half-day away and also with same value for hour-of-day ((similar_spikes.index - idx_in > half_day) | (idx_in - similar_spikes.index > half_day)) & (abs(similar_spikes.index.hour - idx_in.hour)<=1) ] if len(other_spikes) == 0: return cnt cnt = 1 other_spikes_idx = other_spikes.index idx_pre = other_spikes_idx[0] for idx in other_spikes_idx: if idx - idx_pre > df_resolution: # consecutive points do NOT count as multiple spikes cnt += 1 idx_pre = idx return cnt def _is_local_minimal(values, idx): if idx == 0: return values.iloc[idx] < values.iloc[idx + 1] elif idx == len(values) - 1: return values.iloc[idx] < values.iloc[idx - 1] else: return (values.iloc[idx] < values.iloc[idx + 1]) and (values.iloc[idx] < values.iloc[idx - 1]) def _locate_spike_range(idx_i): if isinstance(idx_i, int): left = idx_i right = idx_i else: # idx_i could be a slice, if df.index.get_loc does not get a unique match left = idx_i.start right = idx_i.stop while left > 0 and deviation_normalized.iloc[left] > 0 and (deviation_normalized.iloc[left] > THRESH_SUPER_SPIKE or not _is_local_minimal(deviation_normalized, left)): left -= 1 while right <= len(df) - 1 and deviation_normalized.iloc[right] > 0 and (deviation_normalized.iloc[right] > THRESH_SUPER_SPIKE or not _is_local_minimal(deviation_normalized, right)): right += 1 return left, right def _calc_neighbor_values(left, right): left_neighbor = df[COL_VALUE].iloc[max(0, left - win_length):left] left_mean, left_std = left_neighbor.mean(), left_neighbor.std() right_neighbor = df[COL_VALUE].iloc[right : min(right + win_length, len(df))] right_mean, right_std = right_neighbor.mean(), right_neighbor.std() if len(left_neighbor) == 0: neighbor_mean = right_mean elif len(right_neighbor) == 0: neighbor_mean = left_mean else: if left_std + right_std > 0: neighbor_mean = (left_mean * right_std + right_mean * left_std) / (left_std + right_std) else: neighbor_mean = (left_mean + right_mean) / 2 return neighbor_mean cnt_iter = 0 iter_limit = int(len(df)*0.01) cnt_removed_spikes = 0 removed_spikes = [] regular_spikes = [] while True: cnt_iter += 1 if cnt_iter >= iter_limit: break if len(regular_spikes) > 0: deviation_normalized_filter_regular = deviation_normalized[~deviation_normalized.index.isin(regular_spikes)] else: deviation_normalized_filter_regular = deviation_normalized if len(deviation_normalized_filter_regular) == 0: break max_val = deviation_normalized_filter_regular.max() if max_val < THRESH_SUPER_SPIKE : break max_idx = deviation_normalized_filter_regular.idxmax() idx_i = deviation_normalized.index.get_loc(max_idx) # need to remove the whole spike, not just the highest part of it left, right = _locate_spike_range(idx_i) similar_spikes = df[(deviation_normalized > max(0.5 * max_val, THRESH_SUPER_SPIKE)) & (deviation_normalized < 2.0 * max_val)] # Only remove the super spikes that are unlikely to happen regularly on multiple days regularity_count = _count_high_spikes_same_hour_other_day(max_idx, similar_spikes) if regularity_count < thresh_regularity: idx_i = idx_i if isinstance(idx_i, int) else idx_i.start # idx_i could be a slice when df.index.get_loc does not get a unique match removed_spikes.append(f'({max_idx}, {df[COL_VALUE].iloc[idx_i]:.3f}, {deviation_normalized.iloc[idx_i]:.3f})') df.loc[df.index[left:right], COL_VALUE] = _calc_neighbor_values(left, right) deviation_normalized.iloc[left:right] = -0.3 cnt_removed_spikes += 1 else: regular_spikes += deviation_normalized.index[left:right].tolist() if cnt_removed_spikes > 0: logger.debug(f'cnt_removed_spikes={cnt_removed_spikes} (cnt_iter={cnt_iter}) {removed_spikes}') return df